From 27cbb72730b79251f85c052c48c42febb30ab5da Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Mon, 17 Jul 2023 07:18:52 +0000 Subject: [PATCH] support read oss object concurrently --- deps/oblib/src/lib/stat/ob_latch_define.cpp | 2 +- deps/oblib/src/lib/stat/ob_latch_define.h | 6 +- deps/oblib/src/lib/wait_event/ob_wait_event.h | 5 + .../src/lib/wait_event/ob_wait_event_desc.md | 9 + src/logservice/CMakeLists.txt | 2 + .../ob_log_external_storage_handler.cpp | 409 +++++++++++++++ .../ob_log_external_storage_handler.h | 175 ++++++ .../ob_log_external_storage_io_task.cpp | 457 ++++++++++++++++ .../ob_log_external_storage_io_task.h | 205 ++++++++ unittest/logservice/CMakeLists.txt | 2 + .../test_log_external_storage_handler.cpp | 496 ++++++++++++++++++ .../test_log_external_storage_io_task.cpp | 164 ++++++ 12 files changed, 1930 insertions(+), 2 deletions(-) create mode 100644 src/logservice/ob_log_external_storage_handler.cpp create mode 100644 src/logservice/ob_log_external_storage_handler.h create mode 100644 src/logservice/ob_log_external_storage_io_task.cpp create mode 100644 src/logservice/ob_log_external_storage_io_task.h create mode 100644 unittest/logservice/test_log_external_storage_handler.cpp create mode 100644 unittest/logservice/test_log_external_storage_io_task.cpp diff --git a/deps/oblib/src/lib/stat/ob_latch_define.cpp b/deps/oblib/src/lib/stat/ob_latch_define.cpp index d619f9e9af..c4bda066b8 100644 --- a/deps/oblib/src/lib/stat/ob_latch_define.cpp +++ b/deps/oblib/src/lib/stat/ob_latch_define.cpp @@ -22,7 +22,7 @@ const ObLatchDesc OB_LATCHES[] = { #undef LATCH_DEF }; -static_assert(ARRAYSIZEOF(OB_LATCHES) == 308, "DO NOT delete latch defination"); +static_assert(ARRAYSIZEOF(OB_LATCHES) == 311, "DO NOT delete latch defination"); static_assert(ObLatchIds::LATCH_END == ARRAYSIZEOF(OB_LATCHES) - 1, "update id of LATCH_END before adding your defination"); } diff --git a/deps/oblib/src/lib/stat/ob_latch_define.h b/deps/oblib/src/lib/stat/ob_latch_define.h index 5b514bbc36..7070a17f45 100644 --- a/deps/oblib/src/lib/stat/ob_latch_define.h +++ b/deps/oblib/src/lib/stat/ob_latch_define.h @@ -319,7 +319,11 @@ LATCH_DEF(TENANT_IO_POOL_LOCK, 304, "tenant io allocator lock", LATCH_FIFO, 2000 LATCH_DEF(DISPLAY_TASKS_LOCK, 305, "display tasks lock", LATCH_READ_PREFER, 2000, 0, DISPLAY_TASKS_LOCK_WAIT, "display tasks lock") LATCH_DEF(TMP_FILE_MEM_BLOCK_LOCK, 306, "tmp file mem block lock", LATCH_FIFO, INT64_MAX, 0, TMP_FILE_MEM_BLOCK_LOCK_WAIT, "tmp file mem block lock") -LATCH_DEF(LATCH_END, 307, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end") +LATCH_DEF(LOG_EXTERNAL_STORAGE_IO_TASK_LOCK, 307, "log external storage io task condition", LATCH_FIFO, 2000, 0, LOG_EXTERNAL_STORAGE_IO_TASK_WAIT, "log external storage io task condition") +LATCH_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_LOCK, 308, "log external storage handler rw lock", LATCH_FIFO, 2000, 0, LOG_EXTERNAL_STORAGE_HANDLER_RW_WAIT, "log external storage handler rw lock") +LATCH_DEF(LOG_EXTERNAL_STORAGE_HANDLER_LOCK, 309, "log external storage handler spin lock", LATCH_FIFO, 2000, 0, LOG_EXTERNAL_STORAGE_HANDLER_WAIT, "log external storage handler spin lock") + +LATCH_DEF(LATCH_END, 310, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end") #endif #ifndef OB_LATCH_DEFINE_H_ diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index 8d05700f59..238039e34b 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -416,6 +416,11 @@ WAIT_EVENT_DEF(STORAGE_AUTOINC_FETCH_RETRY_SLEEP, 20004, "sleep: tablet autoinc WAIT_EVENT_DEF(STORAGE_AUTOINC_FETCH_CONFLICT_SLEEP, 20005, "sleep: tablet autoinc fetch new range conflict wait", "sleep_interval", "", "", CONCURRENCY, "sleep: tablet autoinc fetch new range conflict wait", true) WAIT_EVENT_DEF(STORAGE_HA_FINISH_TRANSFER, 20006, "sleep: finish transfer sleep wait", "sleep_interval", "", "", CONCURRENCY, "sleep: finish transfer sleep wait", true) +// logservice +WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_IO_TASK_WAIT, 20007, "latch: log external storage io task wait", "", "", "", CONCURRENCY, "latch: log external storage io task wait", true) +WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_RW_WAIT, 20008, "latch: log external storage handler rw wait", "", "", "", CONCURRENCY, "latch: log external storage handler rw wait", true) +WAIT_EVENT_DEF(LOG_EXTERNAL_STORAGE_HANDLER_WAIT, 20009, "latch: log external storage handler spin wait", "", "", "", CONCURRENCY, "latch: log external storage handler spin wait", true) + WAIT_EVENT_DEF(WAIT_EVENT_END, 99999, "event end", "", "", "", OTHER, "event end", false) #endif diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md b/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md index 600b4e6e54..5c1c1c6140 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md +++ b/deps/oblib/src/lib/wait_event/ob_wait_event_desc.md @@ -321,3 +321,12 @@ The read and write operation on hb_responses_ should be mutually exclusive. ## latch: all servers info in table lock wait The read and write operation on all_servers_info_in_table_ should be mutually exclusive. + +## thread_cond: log external storage io task lock wait +The read and write operation on ObLogExternalStorageIOTaskCtx should be mutually exclusive. + +## rwlock: log external storage io handler rw lock wait +The read and write operation on ObLogExternalStorageIOHandler should be mutually exclusive. + +## rwlock: log external storage io handler spin lock wait +The read and write operation on ObLogExternalStorageIOHandler should be mutually exclusive. diff --git a/src/logservice/CMakeLists.txt b/src/logservice/CMakeLists.txt index d8dd959fbc..a8df4466b3 100644 --- a/src/logservice/CMakeLists.txt +++ b/src/logservice/CMakeLists.txt @@ -50,6 +50,8 @@ ob_set_subtarget(ob_logservice common ob_log_flashback_service.cpp ob_net_keepalive_adapter.cpp ob_log_monitor.cpp + ob_log_external_storage_handler.cpp + ob_log_external_storage_io_task.cpp ) ob_set_subtarget(ob_logservice common_mixed diff --git a/src/logservice/ob_log_external_storage_handler.cpp b/src/logservice/ob_log_external_storage_handler.cpp new file mode 100644 index 0000000000..45c98b7ba8 --- /dev/null +++ b/src/logservice/ob_log_external_storage_handler.cpp @@ -0,0 +1,409 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_log_external_storage_handler.h" +#include "lib/string/ob_string.h" // ObString +#include "lib/stat/ob_latch_define.h" // LOG_EXTERNAL_STORAGE_HANDLER_LOCK +#include "share/rc/ob_tenant_base.h" // MTL_NEW +#include "ob_log_external_storage_io_task.h" // ObLogExternalStorageIOTask + +namespace oceanbase +{ +namespace logservice +{ +using namespace common; + +const int64_t ObLogExternalStorageHandler::CONCURRENCY_LIMIT = 128; +const int64_t ObLogExternalStorageHandler::DEFAULT_RETRY_INTERVAL = 2 * 1000; +const int64_t ObLogExternalStorageHandler::DEFAULT_TIME_GUARD_THRESHOLD = 2 * 1000; +const int64_t ObLogExternalStorageHandler::DEFAULT_PREAD_TIME_GUARD_THRESHOLD = 100 * 1000; +const int64_t ObLogExternalStorageHandler::DEFAULT_RESIZE_TIME_GUARD_THRESHOLD = 100 * 1000; +const int64_t ObLogExternalStorageHandler::CAPACITY_COEFFICIENT = 64; +const int64_t ObLogExternalStorageHandler::SINGLE_TASK_MINIMUM_SIZE = 2 * 1024 * 1024; + +ObLogExternalStorageHandler::ObLogExternalStorageHandler() + : concurrency_(-1), + capacity_(-1), + resize_rw_lock_(common::ObLatchIds::LOG_EXTERNAL_STORAGE_HANDLER_RW_LOCK), + construct_async_task_lock_(common::ObLatchIds::LOG_EXTERNAL_STORAGE_HANDLER_LOCK), + handle_adapter_(NULL), + is_running_(false), + is_inited_(false) +{} + +ObLogExternalStorageHandler::~ObLogExternalStorageHandler() +{ + destroy(); +} + +int ObLogExternalStorageHandler::init() +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + CLOG_LOG(WARN, "ObLogExternalStorageHandler inited twice", KPC(this)); + } else if (NULL == (handle_adapter_ = MTL_NEW(ObLogExternalStorageIOTaskHandleAdapter, "ObLogEXTHandler"))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + CLOG_LOG(WARN, "allocate memory failed"); + } else { + concurrency_ = 0; + capacity_ = 0; + is_running_ = false; + is_inited_ = true; + CLOG_LOG(INFO, "ObLogExternalStorageHandler inits successfully", KPC(this)); + } + if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { + destroy(); + } + return ret; +} + +int ObLogExternalStorageHandler::start(const int64_t concurrency) +{ + int ret = OB_SUCCESS; + WLockGuard guard(resize_rw_lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "ObLogExternalStorageHandler not inited", K(concurrency), KPC(this)); + } else if (is_running_) { + CLOG_LOG(WARN, "ObLogExternalStorageHandler has run", K(concurrency), KPC(this)); + } else if (!is_valid_concurrency_(concurrency)) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this)); + } else if (0 != concurrency + && OB_FAIL(ObSimpleThreadPool::init( + concurrency, CAPACITY_COEFFICIENT * concurrency, "ObLogEXTTP", MTL_ID()))) { + CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this)); + } else { + concurrency_ = concurrency; + capacity_ = CAPACITY_COEFFICIENT * concurrency; + is_running_ = true; + } + return ret; +} + +void ObLogExternalStorageHandler::stop() +{ + WLockGuard guard(resize_rw_lock_); + is_running_ = false; + ObSimpleThreadPool::stop(); +} + +void ObLogExternalStorageHandler::wait() +{ + WLockGuard guard(resize_rw_lock_); + ObSimpleThreadPool::wait(); +} + +void ObLogExternalStorageHandler::destroy() +{ + CLOG_LOG_RET(WARN, OB_SUCCESS, "ObLogExternalStorageHandler destroy"); + is_inited_ = false; + stop(); + wait(); + ObSimpleThreadPool::destroy(); + concurrency_ = -1; + capacity_ = -1; + MTL_DELETE(ObLogExternalStorageIOTaskHandleIAdapter, "ObLogEXTHandler", handle_adapter_); + handle_adapter_ = NULL; +} + +int ObLogExternalStorageHandler::resize(const int64_t new_concurrency, + const int64_t timeout_us) +{ + int ret = OB_SUCCESS; + ObTimeGuard time_guard("resize thread pool", DEFAULT_RESIZE_TIME_GUARD_THRESHOLD); + WLockGuardTimeout guard(resize_rw_lock_, timeout_us, ret); + time_guard.click("after hold lock"); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "ObLogExternalStorageHandler not inited", KPC(this), K(new_concurrency), K(timeout_us)); + } else if (!is_running_) { + ret = OB_NOT_RUNNING; + CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us)); + } else if (!is_valid_concurrency_(new_concurrency) || 0 > timeout_us) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid arguments", KPC(this), K(new_concurrency), K(timeout_us)); + // hold lock failed + } else if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "hold lock failed", KPC(this), K(new_concurrency), K(timeout_us)); + } else if (new_concurrency == concurrency_) { + CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency)); + } else { + destroy_and_init_new_thread_pool_(new_concurrency); + time_guard.click("after create new thread pool"); + concurrency_ = new_concurrency; + capacity_ = CAPACITY_COEFFICIENT * new_concurrency; + CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency)); + } + return ret; +} + +int ObLogExternalStorageHandler::pread(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *buf, + const int64_t read_buf_size, + int64_t &real_read_size) +{ + int ret = OB_SUCCESS; + int64_t async_task_count = 0; + ObTimeGuard time_guard("slow pread", DEFAULT_PREAD_TIME_GUARD_THRESHOLD); + ObLogExternalStorageIOTaskCtx *async_task_ctx = NULL; + int64_t file_size = palf::PALF_PHY_BLOCK_SIZE; + int64_t real_read_buf_size = 0; + RLockGuard guard(resize_rw_lock_); + time_guard.click("after hold by lock"); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "ObLogExternalStorageHandler not init", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size)); + } else if (!is_running_) { + ret = OB_NOT_RUNNING; + CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size)); + } else if (uri.empty() || storage_info.empty() || 0 > offset || NULL == buf || 0 >= read_buf_size) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "ObLogExternalStorageHandler invalid argument", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size)); + } else if (offset >= file_size) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "read position lager than file size, invalid argument", K(file_size), K(offset), K(uri)); + } else if (FALSE_IT(time_guard.click("after get file size"))) { + // NB: limit read size. + } else if (FALSE_IT(real_read_buf_size = std::min(file_size - offset, read_buf_size))) { + } else if (OB_FAIL(construct_async_tasks_and_push_them_into_thread_pool_( + uri, storage_info, offset, buf, real_read_buf_size, real_read_size, async_task_ctx))) { + CLOG_LOG(WARN, "construct_async_task_and_push_them_into_thread_pool_ failed", K(uri), K(storage_info), + K(offset), KP(buf), K(read_buf_size)); + } else if (FALSE_IT(time_guard.click("after construct async tasks"))) { + } else if (OB_FAIL(wait_async_tasks_finished_(async_task_ctx))) { + CLOG_LOG(WARN, "wait_async_tasks_finished_ failed", K(uri), K(storage_info), + K(offset), KP(buf), K(read_buf_size), KPC(async_task_ctx)); + } else if (FALSE_IT(time_guard.click("after wait async tasks"))) { + } else { + // if there is a failure of any async task, return the error of it, otherwise, return OB_SUCCESS. + ret = async_task_ctx->get_ret_code(); + if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "pread finished", K(time_guard), K(uri), K(storage_info), K(offset), K(read_buf_size), + K(real_read_size)); + } else { + CLOG_LOG(TRACE, "pread finished", K(time_guard), K(uri), K(storage_info), K(offset), K(read_buf_size), + K(real_read_size)); + } + } + // FIXME: consider use shared ptr. + if (NULL != async_task_ctx) { + MTL_DELETE(ObLogExternalStorageIOTaskCtx, "ObLogEXTHandler", async_task_ctx); + async_task_ctx = NULL; + } + return ret; +} + +void ObLogExternalStorageHandler::handle(void *task) +{ + int ret = OB_SUCCESS; + ObLogExternalStorageIOTask *io_task = reinterpret_cast(task); + if (OB_ISNULL(io_task)) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(WARN, "io_task is nullptr, unexpected error!!!", KP(io_task), KPC(this)); + } else if (OB_FAIL(io_task->do_task())) { + CLOG_LOG(WARN, "do_task failed", KP(io_task), KPC(this)); + } else { + CLOG_LOG(TRACE, "do_task success", KP(io_task), KPC(this)); + } + if (OB_NOT_NULL(io_task)) { + MTL_DELETE(ObLogExternalStorageIOTask, "ObLogEXTHandler", io_task); + } +} + +bool ObLogExternalStorageHandler::is_valid_concurrency_(const int64_t concurrency) const +{ + return 0 <= concurrency && CONCURRENCY_LIMIT >= concurrency; +} + +int64_t ObLogExternalStorageHandler::get_async_task_count_(const int64_t total_size) const +{ + // TODO by runlin: consider free async thread num. + int64_t minimum_async_task_size = SINGLE_TASK_MINIMUM_SIZE; + int64_t minimum_async_task_count = 1; + int64_t async_task_count = std::max(minimum_async_task_count, + std::min(concurrency_, + (total_size + minimum_async_task_size - 1)/minimum_async_task_size)); + return async_task_count; +} + +int ObLogExternalStorageHandler::construct_async_tasks_and_push_them_into_thread_pool_( + const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *read_buf, + const int64_t read_buf_size, + int64_t &real_read_size, + ObLogExternalStorageIOTaskCtx *&async_task_ctx) +{ + int ret = OB_SUCCESS; + int64_t async_task_count = get_async_task_count_(read_buf_size); + int64_t async_task_size = (read_buf_size + async_task_count - 1) / async_task_count; + int64_t remained_task_count = async_task_count; + int64_t remained_total_size = read_buf_size; + real_read_size = 0; + async_task_ctx = NULL; + if (NULL == (async_task_ctx = MTL_NEW(ObLogExternalStorageIOTaskCtx, "ObLogEXTHandler"))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + CLOG_LOG(WARN, "allocate memory failed", KP(async_task_ctx)); + } else if (OB_FAIL(async_task_ctx->init(async_task_count))) { + CLOG_LOG(WARN, "init ObLogExternalStorageIOTaskCtx failed", KP(async_task_ctx), K(async_task_count)); + } else { + CLOG_LOG(TRACE, "begin construct async tasks", K(async_task_count), K(async_task_size), + K(remained_task_count), K(remained_total_size)); + int64_t curr_read_offset = offset; + int64_t curr_read_buf_pos = 0; + // construct async task and push it into thread pool, last async task will be executed in current thread. + ObLogExternalStorageIOTask *last_io_task = NULL; + int64_t curr_task_idx = 0; + while (remained_task_count > 0) { + ObLogExternalStoragePreadTask *pread_task = NULL; + int64_t async_task_read_buf_size = std::min(remained_total_size, async_task_size); + construct_async_read_task_(uri, storage_info, curr_read_offset, read_buf + curr_read_buf_pos, + async_task_read_buf_size, real_read_size, curr_task_idx, + async_task_ctx, pread_task); + ++curr_task_idx; + curr_read_offset += async_task_read_buf_size; + curr_read_buf_pos += async_task_read_buf_size; + remained_total_size -= async_task_read_buf_size; + last_io_task = pread_task; + + CLOG_LOG(TRACE, "construct async tasks idx success", K(curr_task_idx), K(async_task_count), K(async_task_size), + K(remained_task_count), K(remained_total_size)); + + // NB: use current thread to execute last io task. + if (--remained_task_count > 0) { + push_async_task_into_thread_pool_(pread_task); + } + } + // defense code, last_io_task must not be NULL. + if (NULL != last_io_task) { + (void)last_io_task->do_task(); + } + } + return ret; +} + +int ObLogExternalStorageHandler::wait_async_tasks_finished_( + ObLogExternalStorageIOTaskCtx *async_task_ctx) +{ + int ret = OB_SUCCESS; + const int64_t DEFAULT_WAIT_US = 50 * 1000; + int64_t print_log_interval = OB_INVALID_TIMESTAMP; + // if async_task_ctx->wait return OB_SUCCESS, means there is no flying task. + // if async_task_ctx->wait return error except OB_TIMEOUT, we can not return + // the errno until async_task_ctx has no flying task. + while (OB_FAIL(async_task_ctx->wait(DEFAULT_WAIT_US)) + && async_task_ctx->has_flying_async_task()) { + if (palf::palf_reach_time_interval(500*1000, print_log_interval)) { + CLOG_LOG(WARN, "wait ObLogExternalStorageIOTaskCtx failed", KPC(async_task_ctx)); + } + } + // even if wait async_task_ctx failed, there is no flying async task at here. + if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "wait ObLogExternalStorageIOTaskCtx failed", KPC(async_task_ctx)); + } + return ret; +} + +void ObLogExternalStorageHandler::construct_async_read_task_( + const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *read_buf, + const int64_t read_buf_size, + int64_t &real_read_size, + const int64_t task_idx, + ObLogExternalStorageIOTaskCtx *async_task_ctx, + ObLogExternalStoragePreadTask *&pread_task) +{ + int ret = OB_SUCCESS; + ObTimeGuard time_guard("construct pread task", DEFAULT_TIME_GUARD_THRESHOLD); + int64_t print_log_interval = OB_INVALID_TIMESTAMP; + do { + RunningStatus *running_status = NULL; + if (OB_FAIL(async_task_ctx->get_running_status(task_idx, running_status))) { + CLOG_LOG(ERROR, "unexpected error!!!", K(task_idx), KP(running_status), KPC(async_task_ctx)); + } else if (NULL == (pread_task = MTL_NEW(ObLogExternalStoragePreadTask, + "ObLogEXTHandler", + uri, storage_info, + running_status, + async_task_ctx, + handle_adapter_, + offset, read_buf_size, + read_buf, real_read_size))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + if (palf::palf_reach_time_interval(1*1000*1000, print_log_interval)) { + CLOG_LOG(WARN, "allocate memory failed", K(task_idx), KP(running_status), KPC(async_task_ctx)); + } + } else { + CLOG_LOG(TRACE, "construct_async_read_task_ success", KPC(pread_task)); + } + if (OB_FAIL(ret)) { + usleep(DEFAULT_RETRY_INTERVAL); + } + } while (OB_FAIL(ret)); +} + +void ObLogExternalStorageHandler::push_async_task_into_thread_pool_( + ObLogExternalStorageIOTask *io_task) +{ + int ret = OB_SUCCESS; + ObTimeGuard time_guard("push pread task", DEFAULT_TIME_GUARD_THRESHOLD); + int64_t print_log_interval = OB_INVALID_TIMESTAMP; + do { + if (OB_FAIL(push(io_task))) { + if (palf::palf_reach_time_interval(1*1000*1000, print_log_interval)) { + CLOG_LOG(WARN, "push task into thread pool failed"); + } + } + if (OB_FAIL(ret)) { + usleep(DEFAULT_RETRY_INTERVAL); + } + } while (OB_FAIL(ret)); +} + +void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_( + const int64_t new_concurrency) +{ + int ret = OB_SUCCESS; + ObTimeGuard time_guard("resize impl", 10 * 1000); + // destroy currenty thread pool + ObSimpleThreadPool::stop(); + time_guard.click("stop old thread pool"); + ObSimpleThreadPool::wait(); + time_guard.click("wait old thread pool"); + ObSimpleThreadPool::destroy(); + time_guard.click("destroy old thread pool"); + + do { + if (0 != new_concurrency + && OB_FAIL(ObSimpleThreadPool::init( + new_concurrency, CAPACITY_COEFFICIENT * new_concurrency, "ObLogEXTTP", MTL_ID()))) { + CLOG_LOG(WARN, "init ObSimpleThreadPool failed", K(new_concurrency), KPC(this)); + } else { + concurrency_ = new_concurrency; + capacity_ = CAPACITY_COEFFICIENT * new_concurrency; + } + if (OB_FAIL(ret)) { + usleep(DEFAULT_RETRY_INTERVAL); + } + } while (OB_FAIL(ret)); + time_guard.click("creat enew thread pool"); + CLOG_LOG_RET(WARN, OB_SUCCESS, "destroy_and_init_new_thread_pool_ success", K(time_guard), KPC(this)); +} + +} // end namespace logservice +} // end namespace oceanbase diff --git a/src/logservice/ob_log_external_storage_handler.h b/src/logservice/ob_log_external_storage_handler.h new file mode 100644 index 0000000000..a96002cf8f --- /dev/null +++ b/src/logservice/ob_log_external_storage_handler.h @@ -0,0 +1,175 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_HANDLER_H_ +#define OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_HANDLER_H_ +#include // int64_t +#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN +#include "lib/utility/ob_print_utils.h" // TO_STRING_KV +#include "lib/thread/ob_simple_thread_pool.h" // ObSimpleThreadPool +#include "lib/lock/ob_spin_lock.h" // ObSpinLock +#include "lib/lock/ob_tc_rwlock.h" // ObRWLock +#include "share/ob_errno.h" // errno +namespace oceanbase +{ +namespace common +{ +class ObString; +} +namespace logservice +{ +class ObLogExternalStorageIOTaskHandleIAdapter; +class ObLogExternalStorageIOTask; +class ObLogExternalStoragePreadTask; +class ObLogExternalStorageIOTaskCtx; +class ObLogExternalStorageHandler : public ObSimpleThreadPool { +public: + ObLogExternalStorageHandler(); + ~ObLogExternalStorageHandler(); + +public: + // @brief: Initializing ObLogExternalStorageHandler + // @return value: + // OB_SUCCESS, initializing successfully. + // OB_NOT_INIT + // OB_INIT_TWICE + // OB_ALLOCATE_MEMORY_FAILED + int init(); + + // @brief: Starting thread pool with the specified concurrency. + // @return value: + // OB_SUCCESS + // OB_ALLOCATE_MEMORY_FAILED, allocate memory failed. + // OB_INVALID_ARGUMENT, invalid argument, concurrency must be greater than or equal to 0 and less than or equal 128. + // + // NB: if concurrency is 0, means all request will be executed in current thread. + int start(const int64_t concurrency); + + // @brief: Stoping thread pool. + void stop(); + + // @brief: Waiting flying task finished. + void wait(); + + // @brief: Destroying thread pool. + void destroy(); + + // Thread safe + // @brief: Resizing the concurrency of thread pool, the new pread operation will be blocked. + // @param[in]: new_concurrency, expected concurrency of thread pool + // @param[in]: timeout_us. + // @return value + // OB_SUCCESS + // OB_EAGAIN, lock timeout + // OB_NOT_INIT + // OB_NOT_RUNNING + int resize(const int64_t new_concurrency, + const int64_t timeout_us = INT64_MAX); + + // NB: Thread safe and synchronous interface. + // @brief: Reading up to count bytes from to a uri with storage info at a given offset. + // @param[in]: uri, a unique sequence of characters that identifies a logical or physical resource used by + // object storage system(e.g.: just like this 'oss://xxx/xxx/...'). + // @param[in]: storage_info, the meta info used to access object storage system(e.g.: just like this + // 'endpoint&access_id&access_key'). + // @param[in]: offset, the given offset which want to read. + // @param[in]: read_buf, the read buffer. + // @param[in]: read_buf_size, the maximum read length. + // @param[out]: real_read_size, the really read length. + // @return value: + // OB_SUCCESS, read successfully. + // OB_INVALID_ARGUMENT, invalid argument. + // OB_ALLOCATE_MEMORY_FAILED, allocate memory failed. + // OB_BACKUP_PERMISSION_DENIED, permission denied. + // OB_BACKUP_FILE_NOT_EXIST, uri not exist. + // OB_OSS_ERROR, oss error. + // OB_NOT_INIT + // OB_NOT_RUNNING + // + // NB: + // 1. A fixed number of asynchronous tasks will be generated based on the read length, and the minimum + // read length of single asynchronous task is 2MB. + // 2. Don't generate asynchronous task when concurrency_ is 0, and execute pread in current thread. + // + // Recommend: + // A maximum of N asynchronous tasks can be generated for a single pread operation(i.e. N = + // MIN(read_buf_size/2M, concurrency)), we recommend the total length of concurrent read + // requests not exceeded 2M*concurrency, in this way, pread can get better performance. + // + int pread(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *read_buf, + const int64_t read_buf_size, + int64_t &real_read_size); + + void handle(void *task) override final; + + TO_STRING_KV(K_(concurrency), K_(capacity), K_(is_running), K_(is_inited), KP(handle_adapter_), KP(this)); +private: + // CONCURRENCY LIMIT is 128. + // NB: max thread number of ObSimpleThreadPool is 256. + static const int64_t CONCURRENCY_LIMIT; + static const int64_t DEFAULT_RETRY_INTERVAL; + static const int64_t DEFAULT_TIME_GUARD_THRESHOLD; + static const int64_t DEFAULT_PREAD_TIME_GUARD_THRESHOLD; + static const int64_t DEFAULT_RESIZE_TIME_GUARD_THRESHOLD; + static const int64_t CAPACITY_COEFFICIENT; + static const int64_t SINGLE_TASK_MINIMUM_SIZE; + +private: + + bool is_valid_concurrency_(const int64_t concurrency) const; + + int64_t get_async_task_count_(const int64_t total_size) const; + int construct_async_tasks_and_push_them_into_thread_pool_( + const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *read_buf, + const int64_t read_buf_size, + int64_t &real_read_size, + ObLogExternalStorageIOTaskCtx *&async_task_ctx); + + int wait_async_tasks_finished_(ObLogExternalStorageIOTaskCtx *async_task_ctx); + + void construct_async_read_task_(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *read_buf, + const int64_t read_buf_size, + int64_t &real_read_size, + const int64_t task_idx, + ObLogExternalStorageIOTaskCtx *async_task_ctx, + ObLogExternalStoragePreadTask *&pread_task); + + void push_async_task_into_thread_pool_(ObLogExternalStorageIOTask *io_task); + + void destroy_and_init_new_thread_pool_(const int64_t concurrency); + +private: + typedef common::RWLock RWLock; + typedef RWLock::RLockGuard RLockGuard; + typedef RWLock::WLockGuard WLockGuard; + typedef RWLock::WLockGuardWithTimeout WLockGuardTimeout; + int64_t concurrency_; + int64_t capacity_; + RWLock resize_rw_lock_; + ObSpinLock construct_async_task_lock_; + ObLogExternalStorageIOTaskHandleIAdapter *handle_adapter_; + bool is_running_; + bool is_inited_; + DISALLOW_COPY_AND_ASSIGN(ObLogExternalStorageHandler); +}; +} // end namespace logservice +} // end namespace oceanbase +#endif diff --git a/src/logservice/ob_log_external_storage_io_task.cpp b/src/logservice/ob_log_external_storage_io_task.cpp new file mode 100644 index 0000000000..08fb18783e --- /dev/null +++ b/src/logservice/ob_log_external_storage_io_task.cpp @@ -0,0 +1,457 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#include "ob_log_external_storage_io_task.h" +#include "lib/string/ob_string.h" // ObString +#include "lib/restore/ob_object_device.h" // ObObjectDevice +#include "share/ob_device_manager.h" // ObDeviceManager +namespace oceanbase +{ +using namespace share; +namespace common +{ +extern const char *OB_STORAGE_ACCESS_TYPES_STR[]; +} +namespace logservice +{ +RunningStatus::RunningStatus() +{ + reset(); +} + +RunningStatus::~RunningStatus() +{ + reset(); +} + +void RunningStatus::reset() +{ + ret_ = -1; + main_thread_id_ = -1; + thread_id_ = -1; + logical_thread_id_ = -1; + status_ = EnumRunningStatus::INVALID_STATUS; +} + +ObLogExternalStorageIOTaskCtx::ObLogExternalStorageIOTaskCtx() + : flying_task_count_(-1), total_task_count_(-1), running_status_(NULL), is_inited_(false) +{} + +ObLogExternalStorageIOTaskCtx::~ObLogExternalStorageIOTaskCtx() +{ + destroy(); +} + +int ObLogExternalStorageIOTaskCtx::init(const int64_t total_task_count) +{ + int ret = OB_SUCCESS; + if (IS_INIT) { + ret = OB_INIT_TWICE; + CLOG_LOG(WARN, "ObLogExternalStorageIOTaskCtx inited twice", K(total_task_count), KPC(this)); + } else if (0 >= total_task_count) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(total_task_count)); + } else if (OB_FAIL(condition_.init(ObWaitEventIds::LOG_EXTERNAL_STORAGE_IO_TASK_WAIT))) { + CLOG_LOG(WARN, "init thread conditions failed", K(total_task_count)); + // set 'total_task_count_' at here + } else if (OB_FAIL(construct_running_status_(total_task_count))) { + CLOG_LOG(WARN, "construct_running_status_ failed", K(total_task_count)); + } else { + flying_task_count_ = total_task_count; + is_inited_ = true; + CLOG_LOG(INFO, "ObLogExternalStorageIOTaskCtx init success", KPC(this), K(total_task_count)); + } + if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { + destroy(); + } + return ret; +} + +void ObLogExternalStorageIOTaskCtx::destroy() +{ + is_inited_ = false; + flying_task_count_ = -1; + deconstruct_running_status_(); + CLOG_LOG(INFO, "ObLogExternalStorageIOTaskCtx destroy success", KPC(this)); +} + +void ObLogExternalStorageIOTaskCtx::signal() +{ + ObThreadCondGuard guard(condition_); + if (0 >= flying_task_count_) { + CLOG_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected ERROR, flying_task_count_ is less than 0", KPC(this)); + } else { + flying_task_count_--; + } + if (0 == flying_task_count_) { + condition_.broadcast(); + } +} + +int ObLogExternalStorageIOTaskCtx::wait(const int64_t timeout_us) +{ + int ret = OB_SUCCESS; + ObThreadCondGuard guard(condition_); + while (OB_SUCC(ret) && flying_task_count_ > 0) { + ret = condition_.wait_us(timeout_us); + } + return ret; +} + +int ObLogExternalStorageIOTaskCtx::get_running_status(const int64_t idx, + RunningStatus *&running_status) const +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "ObLogExternalStorageIOTaskCtx not init", KPC(this)); + } else if (0 > idx && total_task_count_ <= idx) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", KPC(this), K(idx)); + } else { + running_status = running_status_ + idx; + running_status->logical_thread_id_ = idx; + running_status->main_thread_id_ = gettid(); + CLOG_LOG(TRACE, "get_running_status success", KPC(this), K(idx), KP(running_status), KPC(running_status)); + } + return ret; +} + +int ObLogExternalStorageIOTaskCtx::get_ret_code() const +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; i < total_task_count_; i++) { + if (OB_SUCCESS != running_status_[i].ret_) { + ret = running_status_[i].ret_; + CLOG_LOG(WARN, "asyn task execute failed", KPC(this)); + break; + } + } + return ret; +} + +bool ObLogExternalStorageIOTaskCtx::has_flying_async_task() const +{ + ObThreadCondGuard guard(condition_); + return flying_task_count_ > 0; +} + +DEF_TO_STRING(ObLogExternalStorageIOTaskCtx) +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV(K_(is_inited), + K_(total_task_count), + K_(flying_task_count), + KP(running_status_)); + J_COMMA(); + if (OB_NOT_NULL(running_status_)) { + J_ARRAY_START(); + for (int64_t i = 0; i < total_task_count_; i++) { + J_OBJ_START(); + J_KV("idx", i); + J_COMMA(); + J_KV("data", running_status_[i]); + J_OBJ_END(); + } + J_ARRAY_END(); + } + J_OBJ_END(); + return pos; +} + +int ObLogExternalStorageIOTaskCtx::construct_running_status_(const int64_t total_task_count) +{ + int ret = OB_SUCCESS; + if (NULL == (running_status_ = + reinterpret_cast(mtl_malloc(sizeof(RunningStatus)*total_task_count, "ObLogEXT")))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + CLOG_LOG(WARN, "allocate memory failed", K(total_task_count), KP(running_status_)); + } else { + total_task_count_ = total_task_count; + for (int64_t i = 0; i < total_task_count; i++) { + RunningStatus *ptr = running_status_ + i; + new (ptr)RunningStatus; + } + } + return ret; +} + +void ObLogExternalStorageIOTaskCtx::deconstruct_running_status_() +{ + if (OB_NOT_NULL(running_status_)) { + for (int64_t i = 0; i < total_task_count_; i++) { + RunningStatus *ptr = running_status_ + i; + ptr->~RunningStatus(); + } + mtl_free(running_status_); + running_status_ = NULL; + total_task_count_ = -1; + CLOG_LOG(TRACE, "deconstruct_running_status_ success", KPC(this)); + } +} + +int get_and_init_io_device(const ObString &uri, + const ObString &storage_info, + ObIODevice *&io_device) +{ + int ret = OB_SUCCESS; + ObIODOpts opts; + ObIODOpt opt; + opts.opts_ = &opt; + opts.opt_cnt_ = 1; + opt.key_ = "storage_info"; + opt.value_.value_str = storage_info.ptr(); + if (OB_FAIL(ObDeviceManager::get_instance().get_device(storage_info, uri, io_device))) { + CLOG_LOG(WARN, "get_device from ObDeviceManager failed", K(storage_info), K(uri), KP(io_device)); + } else if (OB_FAIL(io_device->start(opts))) { + CLOG_LOG(WARN, "start io device failed", K(storage_info), K(uri), KP(io_device)); + } else { + CLOG_LOG(TRACE, "get_io_device success", K(uri), K(storage_info), KP(io_device)); + } + return ret; +} + +void release_io_device(ObIODevice *&io_device) +{ + if (NULL != io_device) { + (void)ObDeviceManager::get_instance().release_device(io_device); + } + io_device = NULL; +} + +enum class OPEN_FLAG{ + INVALID_FLAG = 0, + READ_FLAG = 1, + WRITE_FLAG = 2, + MAX_FLAG = 3 +}; + +int convert_to_storage_access_type(const OPEN_FLAG &open_flag, + ObStorageAccessType &storage_access_type) +{ + int ret = OB_SUCCESS; + if (OPEN_FLAG::READ_FLAG == open_flag) { + storage_access_type = ObStorageAccessType::OB_STORAGE_ACCESS_READER; + } else { + ret = OB_NOT_SUPPORTED; + CLOG_LOG(WARN, "not supported flag", K(open_flag)); + } + return ret; +} + +int open_io_fd(const ObString &uri, + const OPEN_FLAG open_flag, + ObIODevice *io_device, + ObIOFd &io_fd) +{ + int ret = OB_SUCCESS; + ObIODOpt opt; + ObIODOpts iod_opts; + iod_opts.opts_ = &opt; + iod_opts.opt_cnt_ = 1; + // TODO by runlin: support write + ObStorageAccessType access_type; + if (OB_FAIL(convert_to_storage_access_type(open_flag, access_type))) { + CLOG_LOG(WARN, "convert_to_storage_access_type failed", K(open_flag)); + } else if (FALSE_IT(opt.set("AccessType", OB_STORAGE_ACCESS_TYPES_STR[access_type]))) { + // flag=-1 and mode=0 are invalid, because ObObjectDevice does not use flag and mode; + } else if (OB_FAIL(io_device->open(uri.ptr(), -1, 0, io_fd, &iod_opts))) { + CLOG_LOG(WARN, "open fd failed", K(uri), K(open_flag)); + } else { + CLOG_LOG(TRACE, "open fd success", K(uri), K(open_flag)); + } + return ret; +} + +int close_io_fd(ObIODevice *io_device, + const ObIOFd &io_fd) +{ + int ret = OB_SUCCESS; + if (NULL == io_device || !io_fd.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "io device is empty"); + } else if (OB_FAIL(io_device->close(io_fd))) { + CLOG_LOG(WARN, "fail to close fd!", K(io_fd)); + } else { + CLOG_LOG(TRACE, "close_io_fd success", KP(io_device), K(io_fd)); + } + return ret; +} + +ObLogExternalStorageIOTaskHandleAdapter::ObLogExternalStorageIOTaskHandleAdapter() {} + +ObLogExternalStorageIOTaskHandleAdapter::~ObLogExternalStorageIOTaskHandleAdapter() {} + +int ObLogExternalStorageIOTaskHandleAdapter::exist(const ObString &uri, + const ObString &storage_info, + bool &exist) +{ + int ret = OB_SUCCESS; + exist = false; + ObIODevice *io_device = NULL; + if (OB_FAIL(get_and_init_io_device(uri, storage_info, io_device))) { + CLOG_LOG(WARN, "get_io_device failed", K(uri), K(storage_info), KP(io_device)); + } else if (OB_FAIL(io_device->exist(uri.ptr(), exist))) { + CLOG_LOG(WARN, "exist failed", K(uri), K(storage_info), KP(io_device), K(exist)); + } else { + CLOG_LOG(TRACE, "exist success", K(uri), K(storage_info), KP(io_device), K(exist)); + } + release_io_device(io_device); + return ret; +} + +int ObLogExternalStorageIOTaskHandleAdapter::get_file_size(const ObString &uri, + const ObString &storage_info, + int64_t &file_size) +{ + int ret = OB_SUCCESS; + file_size = 0; + ObIODFileStat file_stat; + ObIODevice *io_device = NULL; + if (OB_FAIL(get_and_init_io_device(uri, storage_info, io_device))) { + CLOG_LOG(WARN, "get_io_device failed", K(uri), K(storage_info), KP(io_device)); + } else if (OB_FAIL(io_device->stat(uri.ptr(), file_stat))) { + CLOG_LOG(WARN, "stat io deveice failed", K(uri)); + } else { + file_size = file_stat.size_; + CLOG_LOG(TRACE, "get_file_size success", K(uri), K(storage_info), KP(io_device), K(file_size)); + } + release_io_device(io_device); + return ret; +} + +int ObLogExternalStorageIOTaskHandleAdapter::pread(const ObString &uri, + const ObString &storage_info, + const int64_t offset, + char *buf, + const int64_t read_buf_size, + int64_t &real_read_size) +{ + ObTimeGuard time_guard("oss pread", 200 * 1000); + int ret = OB_SUCCESS; + real_read_size = 0; + ObIODevice *io_device = NULL; + ObIOFd io_fd; + if (OB_FAIL(get_and_init_io_device(uri, storage_info, io_device))) { + CLOG_LOG(WARN, "get_io_device failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); + } else if (FALSE_IT(time_guard.click("after get_io_device"))) { + } else if (OB_FAIL(open_io_fd(uri, OPEN_FLAG::READ_FLAG, io_device, io_fd))) { + } else if (FALSE_IT(time_guard.click("after open_io_fd"))) { + CLOG_LOG(WARN, "open_io_fd failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); + } else if (OB_FAIL(io_device->pread(io_fd, offset, read_buf_size, buf, real_read_size))) { + CLOG_LOG(WARN, "pread failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); + } else if (FALSE_IT(time_guard.click("after pread"))) { + } else { + CLOG_LOG(TRACE, "pread success", K(time_guard), K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); + } + (void)close_io_fd(io_device, io_fd); + (void)release_io_device(io_device); + return ret; +} + +ObLogExternalStorageIOTask::ObLogExternalStorageIOTask(const ObString &uri, + const ObString &storage_info, + RunningStatus *running_status, + ObLogExternalStorageIOTaskCtx *io_task_ctx, + ObLogExternalStorageIOTaskHandleIAdapter *adapter) + : uri_ob_str_(OB_MAX_URI_LENGTH, 0, uri_str_), + storage_info_ob_str_(OB_MAX_BACKUP_STORAGE_INFO_LENGTH, 0, storage_info_str_), + generate_ts_(OB_INVALID_TIMESTAMP), + do_task_ts_(OB_INVALID_TIMESTAMP), + running_status_(running_status), + io_task_ctx_(io_task_ctx), + type_(ObLogExternalStorageIOTaskType::INVALID_TYPE), + adapter_(adapter) +{ + memset(uri_str_, '\0', OB_MAX_URI_LENGTH); + memset(storage_info_str_, '\0', OB_MAX_BACKUP_STORAGE_INFO_LENGTH); + uri_ob_str_.write(uri.ptr(), uri.length()); + storage_info_ob_str_.write(storage_info.ptr(), storage_info.length()); + generate_ts_ = ObTimeUtility::current_time(); +} + +ObLogExternalStorageIOTask::~ObLogExternalStorageIOTask() +{ + memset(uri_str_, '\0', OB_MAX_URI_LENGTH); + memset(storage_info_str_, '\0', OB_MAX_BACKUP_STORAGE_INFO_LENGTH); + uri_ob_str_.reset(); + storage_info_ob_str_.reset(); + generate_ts_ = OB_INVALID_TIMESTAMP; + do_task_ts_ = OB_INVALID_TIMESTAMP; + running_status_ = NULL; + io_task_ctx_ = NULL; + type_ = ObLogExternalStorageIOTaskType::INVALID_TYPE; + adapter_ = NULL; +} + +int ObLogExternalStorageIOTask::do_task() +{ + int ret = OB_SUCCESS; + do_task_ts_ = ObTimeUtility::current_time(); + running_status_->thread_id_ = gettid(); + running_status_->status_ = EnumRunningStatus::START_STATUS; + int64_t io_delay = do_task_ts_ - generate_ts_; + if (io_delay >= 500 * 1000) { + CLOG_LOG(WARN, "[io delay] handle io task delay too much", K(io_delay), KPC(this)); + } + if (OB_FAIL(do_task_())) { + CLOG_LOG(WARN, "do_task_ failed", KP(this)); + } + return ret; +} + +ObLogExternalStoragePreadTask::ObLogExternalStoragePreadTask(const ObString &uri, + const ObString &storage_info, + RunningStatus *running_status, + ObLogExternalStorageIOTaskCtx *io_task_ctx, + ObLogExternalStorageIOTaskHandleIAdapter *adapter, + const int64_t offset, + const int64_t read_buf_size, + char *read_buf, + int64_t &real_read_size) + : ObLogExternalStorageIOTask(uri, storage_info, running_status, io_task_ctx, adapter), + offset_(offset), + read_buf_size_(read_buf_size), + read_buf_(read_buf), + real_read_size_(real_read_size) +{ + type_ = ObLogExternalStorageIOTaskType::PREAD_TYPE; +} + +ObLogExternalStoragePreadTask::~ObLogExternalStoragePreadTask() +{ + offset_ = -1; + read_buf_size_ = -1; + read_buf_ = NULL; +} + +int ObLogExternalStoragePreadTask::do_task_() +{ + int ret = OB_SUCCESS; + int64_t tmp_real_read_size = 0; + if (OB_FAIL(adapter_->pread(uri_ob_str_, storage_info_ob_str_, offset_, + read_buf_, read_buf_size_, tmp_real_read_size))) { + CLOG_LOG(WARN, "pread failed", KPC(this)); + } else { + ATOMIC_AAF(&real_read_size_, tmp_real_read_size); + } + running_status_->ret_ = ret; + running_status_->status_ = EnumRunningStatus::FINSHED_STATUS; + CLOG_LOG(TRACE, "finished pread", KPC(this), K(tmp_real_read_size)); + // NB: don't use io_task_ctx_ or KPC(this) after signal. + // consider used shared ptr to manage memory release of ObLogExternalStorageIOTaskCtx + io_task_ctx_->signal(); + return ret; +} + +} // end namespace logservice +} // end namespace oceanbase diff --git a/src/logservice/ob_log_external_storage_io_task.h b/src/logservice/ob_log_external_storage_io_task.h new file mode 100644 index 0000000000..a6dfe72651 --- /dev/null +++ b/src/logservice/ob_log_external_storage_io_task.h @@ -0,0 +1,205 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#ifndef OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_IO_TASK_H_ +#define OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_IO_TASK_H_ +#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN +#include "lib/utility/ob_print_utils.h" // TO_STRING_KV +#include "lib//ob_define.h" // OB_MAX_URI_LENGTH +#include "lib/lock/ob_thread_cond.h" // ObThreadCond +#include "share/backup/ob_backup_struct.h" // OB_MAX_BACKUP_STORAGE_INFO_LENGTH +namespace oceanbase +{ +namespace common +{ +class ObString; +} +namespace logservice +{ +enum class EnumRunningStatus { + INVALID_STATUS = 0, + START_STATUS = 1, + FINSHED_STATUS = 2, + MAX_STATUS = 3 +}; + +struct RunningStatus +{ + RunningStatus(); + ~RunningStatus(); + void reset(); + int ret_; + // For debug + int64_t main_thread_id_; + int64_t thread_id_; + int64_t logical_thread_id_; + EnumRunningStatus status_; + TO_STRING_KV(KR(ret_), K_(main_thread_id), K_(thread_id), K_(logical_thread_id), K_(status)); +}; + +class ObLogExternalStorageIOTaskCtx { +public: + ObLogExternalStorageIOTaskCtx(); + ~ObLogExternalStorageIOTaskCtx(); + +public: + int init(const int64_t total_task_count); + void destroy(); + // @brief: called by the executer of ObLogExternalStorageIOTask after it has finished. + void signal(); + // @brief: wait up to timeout_us until there is no flying task. + // @return value + // OB_SUCCESS + // OB_TIMEOUT, timeout + // others, system error + int wait(int64_t timeout_us); + + int get_running_status(const int64_t idx, + RunningStatus *&running_stats) const; + int get_ret_code() const; + bool has_flying_async_task() const; + DECLARE_TO_STRING; + +private: + int construct_running_status_(const int64_t total_task_count); + void deconstruct_running_status_(); +private: + int64_t flying_task_count_; + int64_t total_task_count_; + mutable ObThreadCond condition_; + RunningStatus *running_status_; + bool is_inited_; + DISALLOW_COPY_AND_ASSIGN(ObLogExternalStorageIOTaskCtx); +}; + +enum class ObLogExternalStorageIOTaskType { + INVALID_TYPE = 0, + PREAD_TYPE = 1, + MAX_TYPE = 2 +}; + +class ObLogExternalStorageIOTaskHandleIAdapter { +public: + ObLogExternalStorageIOTaskHandleIAdapter() {} + virtual ~ObLogExternalStorageIOTaskHandleIAdapter() {} + + // Interface +public: + + virtual int exist(const common::ObString &uri, + const common::ObString &storage_info, + bool &exist) = 0; + + virtual int get_file_size(const common::ObString &uri, + const common::ObString &storage_info, + int64_t &file_size) = 0; + + virtual int pread(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *buf, + const int64_t read_buf_size, + int64_t &real_read_size) = 0; +}; + +class ObLogExternalStorageIOTaskHandleAdapter : public ObLogExternalStorageIOTaskHandleIAdapter { +public: + ObLogExternalStorageIOTaskHandleAdapter(); + ~ObLogExternalStorageIOTaskHandleAdapter() override; + + // Implement +public: + + int exist(const common::ObString &uri, + const common::ObString &storage_info, + bool &exist) override final; + + int get_file_size(const common::ObString &uri, + const common::ObString &storage_info, + int64_t &file_size) override final; + + int pread(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *buf, + const int64_t read_buf_size, + int64_t &real_read_size) override final; +}; + +class ObLogExternalStorageIOTask { +public: + ObLogExternalStorageIOTask(const ObString &uri, + const ObString &storage_info, + RunningStatus *running_status, + ObLogExternalStorageIOTaskCtx *io_task_ctx, + ObLogExternalStorageIOTaskHandleIAdapter *adapter); + + virtual ~ObLogExternalStorageIOTask(); + +public: + int do_task(); + + VIRTUAL_TO_STRING_KV("BaseClass", "ObLogExternalStorageIOTask", + "uri", uri_ob_str_, + "storage_info", storage_info_ob_str_, + "generate_ts", generate_ts_, + "do_task_ts", do_task_ts_, + "type", type_, + KP(running_status_), + KPC(io_task_ctx_)); +private: + virtual int do_task_() = 0; + +protected: + ObString uri_ob_str_; + char uri_str_[OB_MAX_URI_LENGTH]; + ObString storage_info_ob_str_; + char storage_info_str_[share::OB_MAX_BACKUP_STORAGE_INFO_LENGTH]; + int64_t generate_ts_; + int64_t do_task_ts_; + RunningStatus *running_status_; + ObLogExternalStorageIOTaskCtx *io_task_ctx_; + ObLogExternalStorageIOTaskType type_; + ObLogExternalStorageIOTaskHandleIAdapter *adapter_; + DISALLOW_COPY_AND_ASSIGN(ObLogExternalStorageIOTask); +}; + +class ObLogExternalStoragePreadTask : public ObLogExternalStorageIOTask { +public: + ObLogExternalStoragePreadTask(const ObString &uri, + const ObString &storage_info, + RunningStatus *running_status, + ObLogExternalStorageIOTaskCtx *io_task_ctx, + ObLogExternalStorageIOTaskHandleIAdapter *adapter, + const int64_t offset, + const int64_t read_buf_size, + char *read_buf, + int64_t &real_read_size); + ~ObLogExternalStoragePreadTask() override; + +public: + INHERIT_TO_STRING_KV("ObLogExternalStorageTask", ObLogExternalStorageIOTask, + K_(offset), K_(read_buf_size), KP(read_buf_), K_(real_read_size)); +private: + + int do_task_() override final; + +private: + int64_t offset_; + int64_t read_buf_size_; + char *read_buf_; + // shared variable between different threads, need use atomic operation. + int64_t &real_read_size_; +}; +} // end namespace logservice +} // end namespace oceanbase + +#endif diff --git a/unittest/logservice/CMakeLists.txt b/unittest/logservice/CMakeLists.txt index c720e87d3d..ea8d283710 100644 --- a/unittest/logservice/CMakeLists.txt +++ b/unittest/logservice/CMakeLists.txt @@ -38,3 +38,5 @@ log_unittest(test_role_change_handler) log_unittest(test_log_mode_mgr) ob_unittest(test_palf_throttling) ob_unittest(test_net_standby_restore_source) +ob_unittest(test_log_external_storage_handler) +ob_unittest(test_log_external_storage_io_task) diff --git a/unittest/logservice/test_log_external_storage_handler.cpp b/unittest/logservice/test_log_external_storage_handler.cpp new file mode 100644 index 0000000000..79f754d059 --- /dev/null +++ b/unittest/logservice/test_log_external_storage_handler.cpp @@ -0,0 +1,496 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#define private public +#include "logservice/ob_log_external_storage_handler.h" +#include "logservice/ob_log_external_storage_io_task.h" +#include "share/backup/ob_backup_io_adapter.h" +#undef private +#include "share/ob_device_manager.h" +#include + +namespace oceanbase +{ +namespace unittest +{ +using namespace common; +using namespace logservice; + +class ObLogExternalStorageIOTaskHandleDummyAdapter : public ObLogExternalStorageIOTaskHandleIAdapter { +public: + + ObLogExternalStorageIOTaskHandleDummyAdapter() {} + ~ObLogExternalStorageIOTaskHandleDummyAdapter() override {} + + // Implemetn +public: + + int exist(const common::ObString &uri, + const common::ObString &storage_info, + bool &exist) override final + { + exist = true; + return OB_SUCCESS; + } + + int get_file_size(const common::ObString &uri, + const common::ObString &storage_info, + int64_t &file_size) override final + { + file_size = palf::PALF_PHY_BLOCK_SIZE; + return OB_SUCCESS; + } + + int pread(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *buf, + const int64_t read_buf_size, + int64_t &real_read_size) override final + { + // 假设8M每秒的读取速度 + const int64_t sleep_us = read_buf_size / (8); + usleep(sleep_us); + real_read_size = read_buf_size; + // while (true) { + // int64_t old_real_read_size = ATOMIC_LOAD(&real_read_size); + // int64_t new_real_read_size = old_real_read_size + read_buf_size; + // CLOG_LOG(INFO, "dummy pread before update real_read_size", K(real_read_size), K(read_buf_size), + // K(old_real_read_size), K(new_real_read_size)); + // if (ATOMIC_BCAS(&real_read_size, old_real_read_size, new_real_read_size)) { + // CLOG_LOG(INFO, "dummy pread can update real_read_size", K(real_read_size), K(read_buf_size), + // K(old_real_read_size), K(new_real_read_size)); + // break; + // } + // CLOG_LOG(INFO, "dummy pread can not update real_read_size", K(real_read_size), K(read_buf_size), + // K(old_real_read_size), K(new_real_read_size)); + // }; + CLOG_LOG(INFO, "dummy pread success", K(real_read_size), K(read_buf_size)); + return OB_SUCCESS; + } +}; + +int delete_oss_object(const common::ObString &uri, + const ObString &oss_info) +{ + int ret = OB_SUCCESS; + common::ObBackupIoAdapter io_adapter; + share::ObBackupStorageInfo storage_info; + if (OB_FAIL(storage_info.set(OB_STORAGE_OSS, oss_info.ptr()))) { + CLOG_LOG(WARN, "set ObBackupStorageInfo failed", K(oss_info)); + } else if (OB_FAIL(io_adapter.del_file(uri, &storage_info))) { + CLOG_LOG(WARN, "del dir failed", K(oss_info)); + } else {} + return ret; +} + +int generate_oss_data(const common::ObString &uri, + const ObString &oss_info, + const char *buf, + const int64_t size) +{ + + common::ObBackupIoAdapter io_adapter; + share::ObBackupStorageInfo storage_info; + int ret = OB_SUCCESS; + bool exist = false; + if (OB_FAIL(storage_info.set(OB_STORAGE_OSS, oss_info.ptr()))) { + CLOG_LOG(WARN, "set ObBackupStorageInfo failed", K(oss_info)); + } else if (OB_FAIL(io_adapter.is_exist(uri, &storage_info, exist))) { + CLOG_LOG(WARN, "is_exist failed", K(oss_info)); + } else if (!exist && OB_FAIL(io_adapter.write_single_file(uri, &storage_info, buf, size))) { + CLOG_LOG(WARN, "ObBackupStorageInfo write_single_file failed", K(oss_info)); + } else { + } + return ret; +} + +TEST(TestLogExternalStorageHandler, test_log_external_storage_handler) +{ + ObLogExternalStorageHandler handler; + // 测试异常内存状态——没有init + EXPECT_EQ(false, handler.is_inited_); + EXPECT_EQ(OB_NOT_INIT, handler.start(10)); + const int64_t MB = 1*1024*1024; + ObString uri = "oss://runlin_test"; + ObString storage_info = "runlin_test"; + int64_t offset = 0; + int64_t read_buf_size = 15*MB; + char *read_buf = reinterpret_cast(ob_malloc(read_buf_size, "unittest")); + ASSERT_NE(nullptr, read_buf); + int64_t real_read_size = 0; + EXPECT_EQ(OB_NOT_INIT, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(OB_NOT_INIT, handler.resize(16, 0)); + + // 测试异常内存状态——没有start + EXPECT_EQ(OB_SUCCESS, handler.init()); + ObLogExternalStorageIOTaskHandleDummyAdapter adapter; + ObLogExternalStorageIOTaskHandleIAdapter *origin_adapter = handler.handle_adapter_; + handler.handle_adapter_ = &adapter; + EXPECT_EQ(true, handler.is_inited_); + EXPECT_EQ(OB_NOT_RUNNING, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(OB_NOT_RUNNING, handler.resize(16, 0)); + + + // 测试invalid argument + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.start(-1)); + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.start(ObLogExternalStorageHandler::CONCURRENCY_LIMIT+1)); + + // start 成功 + const int64_t concurrency = 16; + EXPECT_EQ(OB_SUCCESS, handler.start(concurrency)); + EXPECT_EQ(true, handler.is_running_); + EXPECT_EQ(concurrency, handler.concurrency_); + EXPECT_EQ(concurrency*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT, + handler.capacity_); + + // 验证读取——invalid argument + { + ObString empty_uri; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(empty_uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + ObString empty_storage_info; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, empty_storage_info, offset, read_buf, read_buf_size, real_read_size)); + int64_t invalid_offset = -1; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, storage_info, invalid_offset, read_buf, read_buf_size, real_read_size)); + invalid_offset = 100*1024*1024; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, storage_info, invalid_offset, read_buf, read_buf_size, real_read_size)); + char *invalid_read_buf = NULL; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, storage_info, offset, invalid_read_buf, read_buf_size, real_read_size)); + int64_t invalid_read_buf_size = 0; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, storage_info, offset, read_buf, invalid_read_buf_size, real_read_size)); + } + + // 验证resize——invalid argument + { + int64_t invalid_concurrency = -1; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(invalid_concurrency, 0)); + int64_t invalid_timeout_us = -1; + EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(concurrency, invalid_timeout_us)); + } + + // 验证私有函数 + { + // 验证is_valid_concurrency_ + int64_t invalid_concurrency = -1; + EXPECT_EQ(false, handler.is_valid_concurrency_(invalid_concurrency)); + invalid_concurrency = ObLogExternalStorageHandler::CONCURRENCY_LIMIT + 1; + EXPECT_EQ(false, handler.is_valid_concurrency_(invalid_concurrency)); + + // 验证get_async_task_count_ + // 单个任务最小2M, 在concurrency足够的情况下,最多存在8个异步任务 + int64_t total_size = 15 * MB; + EXPECT_EQ(8, handler.get_async_task_count_(total_size)); + // 最多存在51个异步任务,由于并发度问题,只能存在16个 + total_size = 101 * MB; + EXPECT_EQ(16, handler.get_async_task_count_(total_size)); + + ObLogExternalStorageIOTaskCtx *async_task_ctx = NULL; + // 验证construct_async_tasks_and_push_them_into_thread_pool_ + real_read_size = 0; + EXPECT_EQ(OB_SUCCESS, handler.construct_async_tasks_and_push_them_into_thread_pool_ + (uri, storage_info, offset, read_buf, read_buf_size, real_read_size, async_task_ctx)); + EXPECT_EQ(handler.get_async_task_count_(read_buf_size), async_task_ctx->total_task_count_); + EXPECT_EQ(OB_SUCCESS, handler.wait_async_tasks_finished_(async_task_ctx)); + ASSERT_EQ(real_read_size, read_buf_size); + CLOG_LOG(INFO, "after test private interface", K(real_read_size), K(read_buf_size)); + } + + // 验证公有函数 + { + real_read_size = 0; + EXPECT_EQ(OB_SUCCESS, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(read_buf_size, real_read_size); + CLOG_LOG(INFO, "after first read", K(read_buf_size), K(real_read_size)); + int64_t new_concurrency = 8; + EXPECT_EQ(OB_SUCCESS, handler.resize(new_concurrency)); + EXPECT_EQ(new_concurrency, handler.concurrency_); + EXPECT_EQ(new_concurrency*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT, + handler.capacity_); + new_concurrency = 0; + EXPECT_EQ(OB_SUCCESS, handler.resize(new_concurrency)); + + real_read_size = 0; + EXPECT_EQ(OB_SUCCESS, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(read_buf_size, real_read_size); + CLOG_LOG(INFO, "after second read", K(read_buf_size), K(real_read_size)); + + new_concurrency = 32; + EXPECT_EQ(OB_SUCCESS, handler.resize(new_concurrency)); + + real_read_size = 0; + EXPECT_EQ(OB_SUCCESS, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(read_buf_size, real_read_size); + CLOG_LOG(INFO, "after third read", K(read_buf_size), K(real_read_size)); + + read_buf_size = 63*1000*1000+123; + real_read_size = 0; + EXPECT_EQ(OB_SUCCESS, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(read_buf_size, real_read_size); + CLOG_LOG(INFO, "after fourth read", K(read_buf_size), K(real_read_size)); + } + + // 并发场景验证 + { + const int64_t pread_thread_count = 16; + std::vector pread_thread; + auto read_func = [&](){ + for (int i = 0; i < 8; i++) { + srandom(ObTimeUtility::current_time()); + const int64_t tmp_read_buf_size = random()%64*1024*1024 + 1; + int64_t tmp_real_read_size = 0; + int64_t tmp_offset = 64*1024*1024 - tmp_read_buf_size; + handler.pread(uri, storage_info, tmp_offset, read_buf, tmp_read_buf_size, tmp_real_read_size); + EXPECT_EQ(tmp_real_read_size, tmp_read_buf_size); + CLOG_LOG(INFO, "pread in thread success", K(tmp_real_read_size), K(tmp_read_buf_size)); + } + }; + auto resize_func = [&](){ + for (int i = 0; i < 8; i++) { + srandom(ObTimeUtility::current_time()); + handler.resize(random()%37); + usleep(1000 * 1000); + } + }; + for (int i = 0; i < pread_thread_count; i++) { + pread_thread.emplace_back(std::thread(read_func)); + } + std::thread resize_thread(resize_func); + + for (int i = 0; i < pread_thread_count; i++) { + pread_thread[i].join(); + } + resize_thread.join(); + } + handler.handle_adapter_ = origin_adapter; +} + +// 测试真实的oss object +TEST(TestLogExternalStorageHandler, test_oss_object) +{ + // 验证oss 不可访问 + { + // 构造错误的oss meta信息 + CLOG_LOG(INFO, "test oss can not access"); + std::string trick_code_first = "host=123&acc"; + std::string trick_code_second = "ess_id=111&acce"; + std::string trick_code_third = "ss_key=222"; + std::string trick_code_fourth = trick_code_first + trick_code_second + trick_code_third; + ObString oss_path(trick_code_fourth.c_str()); + ObString uri("oss://backup_dir/1234"); + ObLogExternalStorageIOTaskHandleAdapter adapter; + bool exist = false; + EXPECT_EQ(OB_OSS_ERROR, adapter.exist(uri, oss_path, exist)); + EXPECT_EQ(false, exist); + int64_t file_size = 0; + EXPECT_EQ(OB_OSS_ERROR, adapter.get_file_size(uri, oss_path, file_size)); + EXPECT_EQ(0, file_size); + const int64_t buf_len = 4096; + char buf[buf_len]; + int64_t real_read_size = 0; + EXPECT_EQ(OB_OSS_ERROR, adapter.pread(uri, oss_path, 0, buf, buf_len, real_read_size)); + EXPECT_EQ(0, real_read_size); + } + + // 验证oss 可访问 + // 需要保证如下的OSS能持续访问 + { + CLOG_LOG(INFO, "test oss can access"); + int ret = OB_SUCCESS; + const char * oss_bucket="oss://antsys-oceanbasebackup/shuning.tsn/"; + const char *oss_host="cn-hangzhou-alipay-b.oss-cdn.aliyun-inc.com"; + std::string trick_id_first = "LTAI4Fdwx"; + std::string trick_id_second = "9iFgZso4"; + std::string trick_id_third = "CqyHPs7"; + std::string oss_id = trick_id_first + trick_id_second + trick_id_third; + std::string trick_key_first = "ER51kMnlmu"; + std::string trick_key_second = "3zXwcxczJMb"; + std::string trick_key_third = "YzJIgrY9O"; + std::string oss_key = trick_key_first + trick_key_second + trick_key_third; + std::string trick_code_first = "host=cn-hangzhou-alipay-b.oss-cdn.aliyun-inc.com&acc"; + std::string trick_code_second = "ess_id=" + oss_id + "&acce"; + std::string trick_code_third = "ss_key=" + oss_key; + std::string trick_code_fourth = trick_code_first + trick_code_second + trick_code_third; + ObString oss_path(trick_code_fourth.c_str()); + ObLogExternalStorageIOTaskHandleAdapter adapter; + + std::vector uris; + std::vector object_checksums; + std::string base_uri = "oss://antsys-oceanbasebackup/shuning.tsn/runlin_test_pf/"; + const int64_t total_oss_object = 5; + // 测试目录下存在5个clog文件 + for (int i = 0; i < total_oss_object; i++) { + uris.push_back(base_uri+std::to_string(i)); + } + int exist_num = 0; + const int64_t buf_len = 64*1024*1024; + char *buf_self_pread = reinterpret_cast(ob_malloc(buf_len, "unittest")); + memset(buf_self_pread, 'x', buf_len); + + // oss地址可能变化,当oss可访问时,进行如下测试 + for (int i = 0; i < total_oss_object; i++) { + bool exist = false; + int64_t real_read_size = 0; + // farm环境写oss太慢了,暂时先单测不运行 + // if (OB_FAIL(generate_oss_data(uris[i].c_str(), oss_path, buf_self_pread, buf_len))) { + // CLOG_LOG(ERROR, "oss can not access", K(uris[i].c_str()), K(oss_path), K(exist)); + // } else { + // int64_t checksum = ob_crc64(buf_self_pread, buf_len); + // object_checksums.push_back(checksum); + // exist_num++; + // memset(buf_self_pread, 'y', buf_len); + // } + } + if (exist_num == total_oss_object) + { + int64_t real_read_size = 0; + // 读的offer为文件尾,读长度为0 + EXPECT_EQ(OB_SUCCESS, adapter.pread(uris[0].c_str(), oss_path, buf_len, buf_self_pread, buf_len, real_read_size)); + EXPECT_EQ(0, real_read_size); + CLOG_LOG(INFO, "read outof upper bound", K(ret), K(real_read_size)); + // 读的offer为文件尾-1000,读长度为0 + EXPECT_EQ(OB_SUCCESS, adapter.pread(uris[0].c_str(), oss_path, buf_len-1000, buf_self_pread, buf_len, real_read_size)); + EXPECT_EQ(1000, real_read_size); + // 读的offer为文件尾-4*1000*1000,读长度为0 + EXPECT_EQ(OB_SUCCESS, adapter.pread(uris[0].c_str(), oss_path, buf_len-1000*1000*4, buf_self_pread, buf_len, real_read_size)); + EXPECT_EQ(1000*1000*4, real_read_size); + // 读的offer为文件尾+1000,读长度为0, 返回invalid argument + // 底层读接口会判断读取的offset是否超过文件长度,超过报错invalid argment + // OSS的feature: 如果指定的offset超过文件长度或者offset+待读取长度大于文件长度,会返回整个文件的数据内容且不会报错 + EXPECT_EQ(OB_INVALID_ARGUMENT, adapter.pread(uris[0].c_str(), oss_path, buf_len+1000, buf_self_pread, buf_len, real_read_size)); + // 读的offer为文件尾+1000*1000*4,读长度为0, 返回invalid argument + EXPECT_EQ(OB_INVALID_ARGUMENT, adapter.pread(uris[0].c_str(), oss_path, buf_len+1000*1000*4, buf_self_pread, buf_len, real_read_size)); + CLOG_LOG(INFO, "read outof upper bound", K(ret), K(real_read_size)); + } + if (exist_num == total_oss_object) + // 验证性能 + { + EXPECT_EQ(OB_SUCCESS, ObDeviceManager::get_instance().init_devices_env()); + ObLogExternalStorageHandler handler; + const int64_t concurrency = 16; + EXPECT_EQ(OB_SUCCESS, handler.init()); + EXPECT_EQ(OB_SUCCESS, handler.start(concurrency)); + + int64_t start_ts = ObTimeUtility::current_time(); + int64_t real_read_size = 0; + for (int i = 0; i < total_oss_object; i++) { + EXPECT_EQ(OB_SUCCESS, handler.pread( + uris[i].c_str(), oss_path, 0, buf_self_pread, buf_len, real_read_size + )); + EXPECT_EQ(real_read_size, buf_len); + } + int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + int64_t bw = total_oss_object * buf_len / cost_ts; + CLOG_LOG(INFO, "band width", K(bw), K(cost_ts)); + } + // 验证读取正确性 + if (exist_num == total_oss_object) + { + ObLogExternalStorageHandler handler; + const int64_t concurrency = 16; + EXPECT_EQ(OB_SUCCESS, handler.init()); + EXPECT_EQ(OB_SUCCESS, handler.start(concurrency)); + ASSERT_NE(nullptr, buf_self_pread); + for (int i = 0; i < total_oss_object; i++) { + int64_t real_read_size = 0; + memset(buf_self_pread, '0', buf_len); + EXPECT_EQ(OB_SUCCESS, handler.pread( + uris[i].c_str(), oss_path, 0, buf_self_pread, buf_len, real_read_size + )); + EXPECT_EQ(buf_len, real_read_size); + int64_t checksum = ob_crc64(buf_self_pread, buf_len); + EXPECT_EQ(checksum, object_checksums[i]); + } + + if (NULL != buf_self_pread) { + ob_free(buf_self_pread); + buf_self_pread = NULL; + } + + // 随机位置读取 + { + auto read_func = [&]() { + for (int i = 0; i < 32; i++) { + srandom(ObTimeUtility::current_time()); + const int64_t object_idx = random() % total_oss_object; + const int64_t file_size = palf::PALF_PHY_BLOCK_SIZE; + int64_t tmp_read_offset = random() % file_size; + const int64_t tmp_read_size = random() % file_size; + char *tmp_read_buf1 = reinterpret_cast(ob_malloc(tmp_read_size, "unittest")); + char *tmp_read_buf2 = reinterpret_cast(ob_malloc(tmp_read_size, "unittest")); + int64_t tmp_real_read_size = 0; + ASSERT_NE(nullptr, tmp_read_buf1); + ASSERT_NE(nullptr, tmp_read_buf2); + EXPECT_EQ(OB_SUCCESS, handler.pread( + uris[object_idx].c_str(), oss_path, tmp_read_offset, tmp_read_buf1, tmp_read_size, tmp_real_read_size + )); + EXPECT_EQ(std::min(file_size - tmp_read_offset, tmp_read_size), + tmp_real_read_size); + tmp_real_read_size = 0; + EXPECT_EQ(OB_SUCCESS, adapter.pread( + uris[object_idx].c_str(), oss_path, tmp_read_offset, tmp_read_buf2, tmp_read_size, tmp_real_read_size + )); + EXPECT_EQ(std::min(file_size - tmp_read_offset, tmp_read_size), + tmp_real_read_size); + EXPECT_EQ(0, memcmp(tmp_read_buf1, tmp_read_buf2, tmp_real_read_size)); + if (NULL != tmp_read_buf1) { + ob_free(tmp_read_buf1); + tmp_read_buf1 = NULL; + } + if (NULL != tmp_read_buf2) { + ob_free(tmp_read_buf2); + tmp_read_buf2 = NULL; + } + } + }; + std::vector tmp_pread_threads; + + for (int i = 0; i < 4; i++) { + tmp_pread_threads.emplace_back(read_func); + } + auto tmp_resize_func = [&](){ + for (int i = 0; i < 4; i++) { + srandom(ObTimeUtility::current_time()); + handler.resize(random()%4); + usleep(1000 * 1000); + } + }; + std::thread tmp_resize_thread(tmp_resize_func); + for (int i = 0; i < 4; i++) { + tmp_pread_threads[i].join(); + } + tmp_resize_thread.join(); + } + for (auto uri : uris) { + delete_oss_object(uri.c_str(), oss_path); + } + CLOG_LOG(INFO, "delete success", K(base_uri.c_str()), K(oss_path)); + } else { + CLOG_LOG(ERROR, "the object of oss is not correct", K(exist_num), K(oss_path)); + } + } +} + +} +} + +int main(int argc, char **argv) +{ + system("rm -rf test_log_external_storage_handler.log*"); + OB_LOGGER.set_file_name("test_log_external_storage_handler.log", true); + OB_LOGGER.set_log_level("TRACE"); + srandom(ObTimeUtility::current_time()); + PALF_LOG(INFO, "begin unittest::test_log_external_storage_handler"); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/unittest/logservice/test_log_external_storage_io_task.cpp b/unittest/logservice/test_log_external_storage_io_task.cpp new file mode 100644 index 0000000000..6efa15945e --- /dev/null +++ b/unittest/logservice/test_log_external_storage_io_task.cpp @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define private public +#define protected public +#include "logservice/ob_log_external_storage_io_task.h" +#undef protected +#undef private +#include + +namespace oceanbase +{ +namespace unittest +{ +using namespace common; +using namespace logservice; + +class ObLogExternalStorageIOTaskHandleDummyAdapter : public ObLogExternalStorageIOTaskHandleIAdapter { +public: + + ObLogExternalStorageIOTaskHandleDummyAdapter() {} + ~ObLogExternalStorageIOTaskHandleDummyAdapter() override {} + + // Implemetn +public: + + int exist(const common::ObString &uri, + const common::ObString &storage_info, + bool &exist) override final + { + exist = true; + return OB_SUCCESS; + } + + int get_file_size(const common::ObString &uri, + const common::ObString &storage_info, + int64_t &file_size) override final + { + file_size = 0; + return OB_SUCCESS; + } + + int pread(const common::ObString &uri, + const common::ObString &storage_info, + const int64_t offset, + char *buf, + const int64_t read_buf_size, + int64_t &real_read_size) override final + { + real_read_size = 0; + return OB_SUCCESS; + } +}; + +TEST(TestLogExternalStorageIOTaskCtx, test_io_ctx) +{ + CLOG_LOG_RET(INFO, OB_SUCCESS, "start test_io_ctx"); + ObLogExternalStorageIOTaskCtx io_ctx; + EXPECT_EQ(OB_INVALID_ARGUMENT, io_ctx.init(-1)); + EXPECT_EQ(false, io_ctx.is_inited_); + EXPECT_EQ(-1, io_ctx.flying_task_count_); + EXPECT_EQ(-1, io_ctx.total_task_count_); + EXPECT_EQ(nullptr, io_ctx.running_status_); + const int64_t test_task_count = 16; + EXPECT_EQ(OB_SUCCESS, io_ctx.init(test_task_count)); + EXPECT_EQ(test_task_count, io_ctx.flying_task_count_); + EXPECT_EQ(test_task_count, io_ctx.total_task_count_); + for (int i = 0; i < test_task_count; i++) { + RunningStatus *ptr = NULL; + EXPECT_EQ(OB_SUCCESS, io_ctx.get_running_status(i, ptr)); + EXPECT_NE(nullptr, ptr); + EXPECT_EQ(-1, ptr->ret_); + EXPECT_EQ(-1, ptr->thread_id_); + EXPECT_EQ(i, ptr->logical_thread_id_); + EXPECT_EQ(ptr->status_, EnumRunningStatus::INVALID_STATUS); + } + // 唤醒test_task_count - 1次,wait依旧报错OB_TIMEOUT + for (int i = 0; i < test_task_count - 1; i++) { + EXPECT_EQ(OB_TIMEOUT, io_ctx.wait(1000)); + io_ctx.signal(); + EXPECT_EQ(true, io_ctx.has_flying_async_task()); + CLOG_LOG(INFO, "io_ctx wait success", K(i), K(io_ctx)); + } + // 唤醒test_task_count次后,wait返回成功 + EXPECT_EQ(OB_TIMEOUT, io_ctx.wait(1000)); + io_ctx.signal(); + EXPECT_EQ(OB_SUCCESS, io_ctx.wait(1000)); + EXPECT_EQ(false, io_ctx.has_flying_async_task()); + EXPECT_EQ(-1, io_ctx.get_ret_code()); + + // 无唤醒,再次wait也能应为flying_task_count为0返回成功 + EXPECT_EQ(0, io_ctx.flying_task_count_); + EXPECT_EQ(OB_SUCCESS, io_ctx.wait(1000)); + + // 验证destroy后,内存状态被重置 + io_ctx.destroy(); + EXPECT_EQ(false, io_ctx.is_inited_); + EXPECT_EQ(-1, io_ctx.flying_task_count_); + EXPECT_EQ(-1, io_ctx.total_task_count_); + EXPECT_EQ(NULL, io_ctx.running_status_); + + CLOG_LOG(INFO, "test_io_ctx success", K(io_ctx)); +} + +TEST(TestLogExternalStorageIOTask, test_pread_task) +{ + CLOG_LOG_RET(INFO, OB_SUCCESS, "start test_pread_task"); + ObLogExternalStorageIOTaskCtx io_ctx; + const int64_t test_task_count = 16; + EXPECT_EQ(OB_SUCCESS, io_ctx.init(16)); + ObString uri = "oss://runlin_test"; + ObString storage_info = "runlin_test"; + + ObLogExternalStoragePreadTask *pread_task_array = NULL; + pread_task_array = reinterpret_cast( + ob_malloc(sizeof(ObLogExternalStoragePreadTask) * test_task_count, "unittest")); + ASSERT_NE(nullptr, pread_task_array); + + char buff[test_task_count]; + int64_t real_read_size = 0; + ObLogExternalStorageIOTaskHandleDummyAdapter adapter; + for (int i = 0; i < test_task_count; i++) { + RunningStatus *running_status = NULL; + EXPECT_EQ(OB_SUCCESS, io_ctx.get_running_status(i, running_status)); + ObLogExternalStoragePreadTask *tmp_ptr = + new(pread_task_array+i) ObLogExternalStoragePreadTask + (uri, storage_info, running_status, &io_ctx, &adapter, i, 1, buff+i, real_read_size); + ASSERT_NE(nullptr, tmp_ptr); + EXPECT_EQ(ObLogExternalStorageIOTaskType::PREAD_TYPE, tmp_ptr->type_); + EXPECT_EQ(OB_SUCCESS, tmp_ptr->do_task()); + if (i != test_task_count - 1) { + EXPECT_EQ(OB_TIMEOUT, io_ctx.wait(1000)); + } + } + EXPECT_EQ(OB_SUCCESS, io_ctx.wait(1000)); + io_ctx.signal(); + EXPECT_EQ(0, io_ctx.flying_task_count_); + EXPECT_EQ(OB_SUCCESS, io_ctx.wait(1000)); + CLOG_LOG(INFO, "test_pread_task success", K(io_ctx)); + ob_free(pread_task_array); +} + +} +} + +int main(int argc, char **argv) +{ + system("rm -f test_log_external_storage_io_task.log*"); + OB_LOGGER.set_file_name("test_log_external_storage_io_task.log", true); + OB_LOGGER.set_log_level("INFO"); + PALF_LOG(INFO, "begin unittest::test_log_external_storage_io_task"); + ::testing::InitGoogleTest(&argc, argv); + oceanbase::ObClusterVersion::get_instance().update_data_version(DATA_CURRENT_VERSION); + return RUN_ALL_TESTS(); +}