support read oss object concurrently

This commit is contained in:
HaHaJeff
2023-07-17 07:18:52 +00:00
committed by ob-robot
parent acc87e2b74
commit 27cbb72730
12 changed files with 1930 additions and 2 deletions

View File

@ -50,6 +50,8 @@ ob_set_subtarget(ob_logservice common
ob_log_flashback_service.cpp
ob_net_keepalive_adapter.cpp
ob_log_monitor.cpp
ob_log_external_storage_handler.cpp
ob_log_external_storage_io_task.cpp
)
ob_set_subtarget(ob_logservice common_mixed

View File

@ -0,0 +1,409 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_log_external_storage_handler.h"
#include "lib/string/ob_string.h" // ObString
#include "lib/stat/ob_latch_define.h" // LOG_EXTERNAL_STORAGE_HANDLER_LOCK
#include "share/rc/ob_tenant_base.h" // MTL_NEW
#include "ob_log_external_storage_io_task.h" // ObLogExternalStorageIOTask
namespace oceanbase
{
namespace logservice
{
using namespace common;
const int64_t ObLogExternalStorageHandler::CONCURRENCY_LIMIT = 128;
const int64_t ObLogExternalStorageHandler::DEFAULT_RETRY_INTERVAL = 2 * 1000;
const int64_t ObLogExternalStorageHandler::DEFAULT_TIME_GUARD_THRESHOLD = 2 * 1000;
const int64_t ObLogExternalStorageHandler::DEFAULT_PREAD_TIME_GUARD_THRESHOLD = 100 * 1000;
const int64_t ObLogExternalStorageHandler::DEFAULT_RESIZE_TIME_GUARD_THRESHOLD = 100 * 1000;
const int64_t ObLogExternalStorageHandler::CAPACITY_COEFFICIENT = 64;
const int64_t ObLogExternalStorageHandler::SINGLE_TASK_MINIMUM_SIZE = 2 * 1024 * 1024;
ObLogExternalStorageHandler::ObLogExternalStorageHandler()
: concurrency_(-1),
capacity_(-1),
resize_rw_lock_(common::ObLatchIds::LOG_EXTERNAL_STORAGE_HANDLER_RW_LOCK),
construct_async_task_lock_(common::ObLatchIds::LOG_EXTERNAL_STORAGE_HANDLER_LOCK),
handle_adapter_(NULL),
is_running_(false),
is_inited_(false)
{}
ObLogExternalStorageHandler::~ObLogExternalStorageHandler()
{
destroy();
}
int ObLogExternalStorageHandler::init()
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
CLOG_LOG(WARN, "ObLogExternalStorageHandler inited twice", KPC(this));
} else if (NULL == (handle_adapter_ = MTL_NEW(ObLogExternalStorageIOTaskHandleAdapter, "ObLogEXTHandler"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "allocate memory failed");
} else {
concurrency_ = 0;
capacity_ = 0;
is_running_ = false;
is_inited_ = true;
CLOG_LOG(INFO, "ObLogExternalStorageHandler inits successfully", KPC(this));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destroy();
}
return ret;
}
int ObLogExternalStorageHandler::start(const int64_t concurrency)
{
int ret = OB_SUCCESS;
WLockGuard guard(resize_rw_lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not inited", K(concurrency), KPC(this));
} else if (is_running_) {
CLOG_LOG(WARN, "ObLogExternalStorageHandler has run", K(concurrency), KPC(this));
} else if (!is_valid_concurrency_(concurrency)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
} else if (0 != concurrency
&& OB_FAIL(ObSimpleThreadPool::init(
concurrency, CAPACITY_COEFFICIENT * concurrency, "ObLogEXTTP", MTL_ID()))) {
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
} else {
concurrency_ = concurrency;
capacity_ = CAPACITY_COEFFICIENT * concurrency;
is_running_ = true;
}
return ret;
}
void ObLogExternalStorageHandler::stop()
{
WLockGuard guard(resize_rw_lock_);
is_running_ = false;
ObSimpleThreadPool::stop();
}
void ObLogExternalStorageHandler::wait()
{
WLockGuard guard(resize_rw_lock_);
ObSimpleThreadPool::wait();
}
void ObLogExternalStorageHandler::destroy()
{
CLOG_LOG_RET(WARN, OB_SUCCESS, "ObLogExternalStorageHandler destroy");
is_inited_ = false;
stop();
wait();
ObSimpleThreadPool::destroy();
concurrency_ = -1;
capacity_ = -1;
MTL_DELETE(ObLogExternalStorageIOTaskHandleIAdapter, "ObLogEXTHandler", handle_adapter_);
handle_adapter_ = NULL;
}
int ObLogExternalStorageHandler::resize(const int64_t new_concurrency,
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
ObTimeGuard time_guard("resize thread pool", DEFAULT_RESIZE_TIME_GUARD_THRESHOLD);
WLockGuardTimeout guard(resize_rw_lock_, timeout_us, ret);
time_guard.click("after hold lock");
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not inited", KPC(this), K(new_concurrency), K(timeout_us));
} else if (!is_running_) {
ret = OB_NOT_RUNNING;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us));
} else if (!is_valid_concurrency_(new_concurrency) || 0 > timeout_us) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", KPC(this), K(new_concurrency), K(timeout_us));
// hold lock failed
} else if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "hold lock failed", KPC(this), K(new_concurrency), K(timeout_us));
} else if (new_concurrency == concurrency_) {
CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency));
} else {
destroy_and_init_new_thread_pool_(new_concurrency);
time_guard.click("after create new thread pool");
concurrency_ = new_concurrency;
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency));
}
return ret;
}
int ObLogExternalStorageHandler::pread(const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *buf,
const int64_t read_buf_size,
int64_t &real_read_size)
{
int ret = OB_SUCCESS;
int64_t async_task_count = 0;
ObTimeGuard time_guard("slow pread", DEFAULT_PREAD_TIME_GUARD_THRESHOLD);
ObLogExternalStorageIOTaskCtx *async_task_ctx = NULL;
int64_t file_size = palf::PALF_PHY_BLOCK_SIZE;
int64_t real_read_buf_size = 0;
RLockGuard guard(resize_rw_lock_);
time_guard.click("after hold by lock");
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not init", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size));
} else if (!is_running_) {
ret = OB_NOT_RUNNING;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size));
} else if (uri.empty() || storage_info.empty() || 0 > offset || NULL == buf || 0 >= read_buf_size) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "ObLogExternalStorageHandler invalid argument", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size));
} else if (offset >= file_size) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "read position lager than file size, invalid argument", K(file_size), K(offset), K(uri));
} else if (FALSE_IT(time_guard.click("after get file size"))) {
// NB: limit read size.
} else if (FALSE_IT(real_read_buf_size = std::min(file_size - offset, read_buf_size))) {
} else if (OB_FAIL(construct_async_tasks_and_push_them_into_thread_pool_(
uri, storage_info, offset, buf, real_read_buf_size, real_read_size, async_task_ctx))) {
CLOG_LOG(WARN, "construct_async_task_and_push_them_into_thread_pool_ failed", K(uri), K(storage_info),
K(offset), KP(buf), K(read_buf_size));
} else if (FALSE_IT(time_guard.click("after construct async tasks"))) {
} else if (OB_FAIL(wait_async_tasks_finished_(async_task_ctx))) {
CLOG_LOG(WARN, "wait_async_tasks_finished_ failed", K(uri), K(storage_info),
K(offset), KP(buf), K(read_buf_size), KPC(async_task_ctx));
} else if (FALSE_IT(time_guard.click("after wait async tasks"))) {
} else {
// if there is a failure of any async task, return the error of it, otherwise, return OB_SUCCESS.
ret = async_task_ctx->get_ret_code();
if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "pread finished", K(time_guard), K(uri), K(storage_info), K(offset), K(read_buf_size),
K(real_read_size));
} else {
CLOG_LOG(TRACE, "pread finished", K(time_guard), K(uri), K(storage_info), K(offset), K(read_buf_size),
K(real_read_size));
}
}
// FIXME: consider use shared ptr.
if (NULL != async_task_ctx) {
MTL_DELETE(ObLogExternalStorageIOTaskCtx, "ObLogEXTHandler", async_task_ctx);
async_task_ctx = NULL;
}
return ret;
}
void ObLogExternalStorageHandler::handle(void *task)
{
int ret = OB_SUCCESS;
ObLogExternalStorageIOTask *io_task = reinterpret_cast<ObLogExternalStorageIOTask*>(task);
if (OB_ISNULL(io_task)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "io_task is nullptr, unexpected error!!!", KP(io_task), KPC(this));
} else if (OB_FAIL(io_task->do_task())) {
CLOG_LOG(WARN, "do_task failed", KP(io_task), KPC(this));
} else {
CLOG_LOG(TRACE, "do_task success", KP(io_task), KPC(this));
}
if (OB_NOT_NULL(io_task)) {
MTL_DELETE(ObLogExternalStorageIOTask, "ObLogEXTHandler", io_task);
}
}
bool ObLogExternalStorageHandler::is_valid_concurrency_(const int64_t concurrency) const
{
return 0 <= concurrency && CONCURRENCY_LIMIT >= concurrency;
}
int64_t ObLogExternalStorageHandler::get_async_task_count_(const int64_t total_size) const
{
// TODO by runlin: consider free async thread num.
int64_t minimum_async_task_size = SINGLE_TASK_MINIMUM_SIZE;
int64_t minimum_async_task_count = 1;
int64_t async_task_count = std::max(minimum_async_task_count,
std::min(concurrency_,
(total_size + minimum_async_task_size - 1)/minimum_async_task_size));
return async_task_count;
}
int ObLogExternalStorageHandler::construct_async_tasks_and_push_them_into_thread_pool_(
const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *read_buf,
const int64_t read_buf_size,
int64_t &real_read_size,
ObLogExternalStorageIOTaskCtx *&async_task_ctx)
{
int ret = OB_SUCCESS;
int64_t async_task_count = get_async_task_count_(read_buf_size);
int64_t async_task_size = (read_buf_size + async_task_count - 1) / async_task_count;
int64_t remained_task_count = async_task_count;
int64_t remained_total_size = read_buf_size;
real_read_size = 0;
async_task_ctx = NULL;
if (NULL == (async_task_ctx = MTL_NEW(ObLogExternalStorageIOTaskCtx, "ObLogEXTHandler"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "allocate memory failed", KP(async_task_ctx));
} else if (OB_FAIL(async_task_ctx->init(async_task_count))) {
CLOG_LOG(WARN, "init ObLogExternalStorageIOTaskCtx failed", KP(async_task_ctx), K(async_task_count));
} else {
CLOG_LOG(TRACE, "begin construct async tasks", K(async_task_count), K(async_task_size),
K(remained_task_count), K(remained_total_size));
int64_t curr_read_offset = offset;
int64_t curr_read_buf_pos = 0;
// construct async task and push it into thread pool, last async task will be executed in current thread.
ObLogExternalStorageIOTask *last_io_task = NULL;
int64_t curr_task_idx = 0;
while (remained_task_count > 0) {
ObLogExternalStoragePreadTask *pread_task = NULL;
int64_t async_task_read_buf_size = std::min(remained_total_size, async_task_size);
construct_async_read_task_(uri, storage_info, curr_read_offset, read_buf + curr_read_buf_pos,
async_task_read_buf_size, real_read_size, curr_task_idx,
async_task_ctx, pread_task);
++curr_task_idx;
curr_read_offset += async_task_read_buf_size;
curr_read_buf_pos += async_task_read_buf_size;
remained_total_size -= async_task_read_buf_size;
last_io_task = pread_task;
CLOG_LOG(TRACE, "construct async tasks idx success", K(curr_task_idx), K(async_task_count), K(async_task_size),
K(remained_task_count), K(remained_total_size));
// NB: use current thread to execute last io task.
if (--remained_task_count > 0) {
push_async_task_into_thread_pool_(pread_task);
}
}
// defense code, last_io_task must not be NULL.
if (NULL != last_io_task) {
(void)last_io_task->do_task();
}
}
return ret;
}
int ObLogExternalStorageHandler::wait_async_tasks_finished_(
ObLogExternalStorageIOTaskCtx *async_task_ctx)
{
int ret = OB_SUCCESS;
const int64_t DEFAULT_WAIT_US = 50 * 1000;
int64_t print_log_interval = OB_INVALID_TIMESTAMP;
// if async_task_ctx->wait return OB_SUCCESS, means there is no flying task.
// if async_task_ctx->wait return error except OB_TIMEOUT, we can not return
// the errno until async_task_ctx has no flying task.
while (OB_FAIL(async_task_ctx->wait(DEFAULT_WAIT_US))
&& async_task_ctx->has_flying_async_task()) {
if (palf::palf_reach_time_interval(500*1000, print_log_interval)) {
CLOG_LOG(WARN, "wait ObLogExternalStorageIOTaskCtx failed", KPC(async_task_ctx));
}
}
// even if wait async_task_ctx failed, there is no flying async task at here.
if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "wait ObLogExternalStorageIOTaskCtx failed", KPC(async_task_ctx));
}
return ret;
}
void ObLogExternalStorageHandler::construct_async_read_task_(
const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *read_buf,
const int64_t read_buf_size,
int64_t &real_read_size,
const int64_t task_idx,
ObLogExternalStorageIOTaskCtx *async_task_ctx,
ObLogExternalStoragePreadTask *&pread_task)
{
int ret = OB_SUCCESS;
ObTimeGuard time_guard("construct pread task", DEFAULT_TIME_GUARD_THRESHOLD);
int64_t print_log_interval = OB_INVALID_TIMESTAMP;
do {
RunningStatus *running_status = NULL;
if (OB_FAIL(async_task_ctx->get_running_status(task_idx, running_status))) {
CLOG_LOG(ERROR, "unexpected error!!!", K(task_idx), KP(running_status), KPC(async_task_ctx));
} else if (NULL == (pread_task = MTL_NEW(ObLogExternalStoragePreadTask,
"ObLogEXTHandler",
uri, storage_info,
running_status,
async_task_ctx,
handle_adapter_,
offset, read_buf_size,
read_buf, real_read_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
if (palf::palf_reach_time_interval(1*1000*1000, print_log_interval)) {
CLOG_LOG(WARN, "allocate memory failed", K(task_idx), KP(running_status), KPC(async_task_ctx));
}
} else {
CLOG_LOG(TRACE, "construct_async_read_task_ success", KPC(pread_task));
}
if (OB_FAIL(ret)) {
usleep(DEFAULT_RETRY_INTERVAL);
}
} while (OB_FAIL(ret));
}
void ObLogExternalStorageHandler::push_async_task_into_thread_pool_(
ObLogExternalStorageIOTask *io_task)
{
int ret = OB_SUCCESS;
ObTimeGuard time_guard("push pread task", DEFAULT_TIME_GUARD_THRESHOLD);
int64_t print_log_interval = OB_INVALID_TIMESTAMP;
do {
if (OB_FAIL(push(io_task))) {
if (palf::palf_reach_time_interval(1*1000*1000, print_log_interval)) {
CLOG_LOG(WARN, "push task into thread pool failed");
}
}
if (OB_FAIL(ret)) {
usleep(DEFAULT_RETRY_INTERVAL);
}
} while (OB_FAIL(ret));
}
void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_(
const int64_t new_concurrency)
{
int ret = OB_SUCCESS;
ObTimeGuard time_guard("resize impl", 10 * 1000);
// destroy currenty thread pool
ObSimpleThreadPool::stop();
time_guard.click("stop old thread pool");
ObSimpleThreadPool::wait();
time_guard.click("wait old thread pool");
ObSimpleThreadPool::destroy();
time_guard.click("destroy old thread pool");
do {
if (0 != new_concurrency
&& OB_FAIL(ObSimpleThreadPool::init(
new_concurrency, CAPACITY_COEFFICIENT * new_concurrency, "ObLogEXTTP", MTL_ID()))) {
CLOG_LOG(WARN, "init ObSimpleThreadPool failed", K(new_concurrency), KPC(this));
} else {
concurrency_ = new_concurrency;
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
}
if (OB_FAIL(ret)) {
usleep(DEFAULT_RETRY_INTERVAL);
}
} while (OB_FAIL(ret));
time_guard.click("creat enew thread pool");
CLOG_LOG_RET(WARN, OB_SUCCESS, "destroy_and_init_new_thread_pool_ success", K(time_guard), KPC(this));
}
} // end namespace logservice
} // end namespace oceanbase

View File

@ -0,0 +1,175 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_HANDLER_H_
#define OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_HANDLER_H_
#include <stdint.h> // int64_t
#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN
#include "lib/utility/ob_print_utils.h" // TO_STRING_KV
#include "lib/thread/ob_simple_thread_pool.h" // ObSimpleThreadPool
#include "lib/lock/ob_spin_lock.h" // ObSpinLock
#include "lib/lock/ob_tc_rwlock.h" // ObRWLock
#include "share/ob_errno.h" // errno
namespace oceanbase
{
namespace common
{
class ObString;
}
namespace logservice
{
class ObLogExternalStorageIOTaskHandleIAdapter;
class ObLogExternalStorageIOTask;
class ObLogExternalStoragePreadTask;
class ObLogExternalStorageIOTaskCtx;
class ObLogExternalStorageHandler : public ObSimpleThreadPool {
public:
ObLogExternalStorageHandler();
~ObLogExternalStorageHandler();
public:
// @brief: Initializing ObLogExternalStorageHandler
// @return value:
// OB_SUCCESS, initializing successfully.
// OB_NOT_INIT
// OB_INIT_TWICE
// OB_ALLOCATE_MEMORY_FAILED
int init();
// @brief: Starting thread pool with the specified concurrency.
// @return value:
// OB_SUCCESS
// OB_ALLOCATE_MEMORY_FAILED, allocate memory failed.
// OB_INVALID_ARGUMENT, invalid argument, concurrency must be greater than or equal to 0 and less than or equal 128.
//
// NB: if concurrency is 0, means all request will be executed in current thread.
int start(const int64_t concurrency);
// @brief: Stoping thread pool.
void stop();
// @brief: Waiting flying task finished.
void wait();
// @brief: Destroying thread pool.
void destroy();
// Thread safe
// @brief: Resizing the concurrency of thread pool, the new pread operation will be blocked.
// @param[in]: new_concurrency, expected concurrency of thread pool
// @param[in]: timeout_us.
// @return value
// OB_SUCCESS
// OB_EAGAIN, lock timeout
// OB_NOT_INIT
// OB_NOT_RUNNING
int resize(const int64_t new_concurrency,
const int64_t timeout_us = INT64_MAX);
// NB: Thread safe and synchronous interface.
// @brief: Reading up to count bytes from to a uri with storage info at a given offset.
// @param[in]: uri, a unique sequence of characters that identifies a logical or physical resource used by
// object storage system(e.g.: just like this 'oss://xxx/xxx/...').
// @param[in]: storage_info, the meta info used to access object storage system(e.g.: just like this
// 'endpoint&access_id&access_key').
// @param[in]: offset, the given offset which want to read.
// @param[in]: read_buf, the read buffer.
// @param[in]: read_buf_size, the maximum read length.
// @param[out]: real_read_size, the really read length.
// @return value:
// OB_SUCCESS, read successfully.
// OB_INVALID_ARGUMENT, invalid argument.
// OB_ALLOCATE_MEMORY_FAILED, allocate memory failed.
// OB_BACKUP_PERMISSION_DENIED, permission denied.
// OB_BACKUP_FILE_NOT_EXIST, uri not exist.
// OB_OSS_ERROR, oss error.
// OB_NOT_INIT
// OB_NOT_RUNNING
//
// NB:
// 1. A fixed number of asynchronous tasks will be generated based on the read length, and the minimum
// read length of single asynchronous task is 2MB.
// 2. Don't generate asynchronous task when concurrency_ is 0, and execute pread in current thread.
//
// Recommend:
// A maximum of N asynchronous tasks can be generated for a single pread operation(i.e. N =
// MIN(read_buf_size/2M, concurrency)), we recommend the total length of concurrent read
// requests not exceeded 2M*concurrency, in this way, pread can get better performance.
//
int pread(const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *read_buf,
const int64_t read_buf_size,
int64_t &real_read_size);
void handle(void *task) override final;
TO_STRING_KV(K_(concurrency), K_(capacity), K_(is_running), K_(is_inited), KP(handle_adapter_), KP(this));
private:
// CONCURRENCY LIMIT is 128.
// NB: max thread number of ObSimpleThreadPool is 256.
static const int64_t CONCURRENCY_LIMIT;
static const int64_t DEFAULT_RETRY_INTERVAL;
static const int64_t DEFAULT_TIME_GUARD_THRESHOLD;
static const int64_t DEFAULT_PREAD_TIME_GUARD_THRESHOLD;
static const int64_t DEFAULT_RESIZE_TIME_GUARD_THRESHOLD;
static const int64_t CAPACITY_COEFFICIENT;
static const int64_t SINGLE_TASK_MINIMUM_SIZE;
private:
bool is_valid_concurrency_(const int64_t concurrency) const;
int64_t get_async_task_count_(const int64_t total_size) const;
int construct_async_tasks_and_push_them_into_thread_pool_(
const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *read_buf,
const int64_t read_buf_size,
int64_t &real_read_size,
ObLogExternalStorageIOTaskCtx *&async_task_ctx);
int wait_async_tasks_finished_(ObLogExternalStorageIOTaskCtx *async_task_ctx);
void construct_async_read_task_(const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *read_buf,
const int64_t read_buf_size,
int64_t &real_read_size,
const int64_t task_idx,
ObLogExternalStorageIOTaskCtx *async_task_ctx,
ObLogExternalStoragePreadTask *&pread_task);
void push_async_task_into_thread_pool_(ObLogExternalStorageIOTask *io_task);
void destroy_and_init_new_thread_pool_(const int64_t concurrency);
private:
typedef common::RWLock RWLock;
typedef RWLock::RLockGuard RLockGuard;
typedef RWLock::WLockGuard WLockGuard;
typedef RWLock::WLockGuardWithTimeout WLockGuardTimeout;
int64_t concurrency_;
int64_t capacity_;
RWLock resize_rw_lock_;
ObSpinLock construct_async_task_lock_;
ObLogExternalStorageIOTaskHandleIAdapter *handle_adapter_;
bool is_running_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(ObLogExternalStorageHandler);
};
} // end namespace logservice
} // end namespace oceanbase
#endif

View File

@ -0,0 +1,457 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_log_external_storage_io_task.h"
#include "lib/string/ob_string.h" // ObString
#include "lib/restore/ob_object_device.h" // ObObjectDevice
#include "share/ob_device_manager.h" // ObDeviceManager
namespace oceanbase
{
using namespace share;
namespace common
{
extern const char *OB_STORAGE_ACCESS_TYPES_STR[];
}
namespace logservice
{
RunningStatus::RunningStatus()
{
reset();
}
RunningStatus::~RunningStatus()
{
reset();
}
void RunningStatus::reset()
{
ret_ = -1;
main_thread_id_ = -1;
thread_id_ = -1;
logical_thread_id_ = -1;
status_ = EnumRunningStatus::INVALID_STATUS;
}
ObLogExternalStorageIOTaskCtx::ObLogExternalStorageIOTaskCtx()
: flying_task_count_(-1), total_task_count_(-1), running_status_(NULL), is_inited_(false)
{}
ObLogExternalStorageIOTaskCtx::~ObLogExternalStorageIOTaskCtx()
{
destroy();
}
int ObLogExternalStorageIOTaskCtx::init(const int64_t total_task_count)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
CLOG_LOG(WARN, "ObLogExternalStorageIOTaskCtx inited twice", K(total_task_count), KPC(this));
} else if (0 >= total_task_count) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(total_task_count));
} else if (OB_FAIL(condition_.init(ObWaitEventIds::LOG_EXTERNAL_STORAGE_IO_TASK_WAIT))) {
CLOG_LOG(WARN, "init thread conditions failed", K(total_task_count));
// set 'total_task_count_' at here
} else if (OB_FAIL(construct_running_status_(total_task_count))) {
CLOG_LOG(WARN, "construct_running_status_ failed", K(total_task_count));
} else {
flying_task_count_ = total_task_count;
is_inited_ = true;
CLOG_LOG(INFO, "ObLogExternalStorageIOTaskCtx init success", KPC(this), K(total_task_count));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destroy();
}
return ret;
}
void ObLogExternalStorageIOTaskCtx::destroy()
{
is_inited_ = false;
flying_task_count_ = -1;
deconstruct_running_status_();
CLOG_LOG(INFO, "ObLogExternalStorageIOTaskCtx destroy success", KPC(this));
}
void ObLogExternalStorageIOTaskCtx::signal()
{
ObThreadCondGuard guard(condition_);
if (0 >= flying_task_count_) {
CLOG_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected ERROR, flying_task_count_ is less than 0", KPC(this));
} else {
flying_task_count_--;
}
if (0 == flying_task_count_) {
condition_.broadcast();
}
}
int ObLogExternalStorageIOTaskCtx::wait(const int64_t timeout_us)
{
int ret = OB_SUCCESS;
ObThreadCondGuard guard(condition_);
while (OB_SUCC(ret) && flying_task_count_ > 0) {
ret = condition_.wait_us(timeout_us);
}
return ret;
}
int ObLogExternalStorageIOTaskCtx::get_running_status(const int64_t idx,
RunningStatus *&running_status) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObLogExternalStorageIOTaskCtx not init", KPC(this));
} else if (0 > idx && total_task_count_ <= idx) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", KPC(this), K(idx));
} else {
running_status = running_status_ + idx;
running_status->logical_thread_id_ = idx;
running_status->main_thread_id_ = gettid();
CLOG_LOG(TRACE, "get_running_status success", KPC(this), K(idx), KP(running_status), KPC(running_status));
}
return ret;
}
int ObLogExternalStorageIOTaskCtx::get_ret_code() const
{
int ret = OB_SUCCESS;
for (int64_t i = 0; i < total_task_count_; i++) {
if (OB_SUCCESS != running_status_[i].ret_) {
ret = running_status_[i].ret_;
CLOG_LOG(WARN, "asyn task execute failed", KPC(this));
break;
}
}
return ret;
}
bool ObLogExternalStorageIOTaskCtx::has_flying_async_task() const
{
ObThreadCondGuard guard(condition_);
return flying_task_count_ > 0;
}
DEF_TO_STRING(ObLogExternalStorageIOTaskCtx)
{
int64_t pos = 0;
J_OBJ_START();
J_KV(K_(is_inited),
K_(total_task_count),
K_(flying_task_count),
KP(running_status_));
J_COMMA();
if (OB_NOT_NULL(running_status_)) {
J_ARRAY_START();
for (int64_t i = 0; i < total_task_count_; i++) {
J_OBJ_START();
J_KV("idx", i);
J_COMMA();
J_KV("data", running_status_[i]);
J_OBJ_END();
}
J_ARRAY_END();
}
J_OBJ_END();
return pos;
}
int ObLogExternalStorageIOTaskCtx::construct_running_status_(const int64_t total_task_count)
{
int ret = OB_SUCCESS;
if (NULL == (running_status_ =
reinterpret_cast<RunningStatus*>(mtl_malloc(sizeof(RunningStatus)*total_task_count, "ObLogEXT")))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "allocate memory failed", K(total_task_count), KP(running_status_));
} else {
total_task_count_ = total_task_count;
for (int64_t i = 0; i < total_task_count; i++) {
RunningStatus *ptr = running_status_ + i;
new (ptr)RunningStatus;
}
}
return ret;
}
void ObLogExternalStorageIOTaskCtx::deconstruct_running_status_()
{
if (OB_NOT_NULL(running_status_)) {
for (int64_t i = 0; i < total_task_count_; i++) {
RunningStatus *ptr = running_status_ + i;
ptr->~RunningStatus();
}
mtl_free(running_status_);
running_status_ = NULL;
total_task_count_ = -1;
CLOG_LOG(TRACE, "deconstruct_running_status_ success", KPC(this));
}
}
int get_and_init_io_device(const ObString &uri,
const ObString &storage_info,
ObIODevice *&io_device)
{
int ret = OB_SUCCESS;
ObIODOpts opts;
ObIODOpt opt;
opts.opts_ = &opt;
opts.opt_cnt_ = 1;
opt.key_ = "storage_info";
opt.value_.value_str = storage_info.ptr();
if (OB_FAIL(ObDeviceManager::get_instance().get_device(storage_info, uri, io_device))) {
CLOG_LOG(WARN, "get_device from ObDeviceManager failed", K(storage_info), K(uri), KP(io_device));
} else if (OB_FAIL(io_device->start(opts))) {
CLOG_LOG(WARN, "start io device failed", K(storage_info), K(uri), KP(io_device));
} else {
CLOG_LOG(TRACE, "get_io_device success", K(uri), K(storage_info), KP(io_device));
}
return ret;
}
void release_io_device(ObIODevice *&io_device)
{
if (NULL != io_device) {
(void)ObDeviceManager::get_instance().release_device(io_device);
}
io_device = NULL;
}
enum class OPEN_FLAG{
INVALID_FLAG = 0,
READ_FLAG = 1,
WRITE_FLAG = 2,
MAX_FLAG = 3
};
int convert_to_storage_access_type(const OPEN_FLAG &open_flag,
ObStorageAccessType &storage_access_type)
{
int ret = OB_SUCCESS;
if (OPEN_FLAG::READ_FLAG == open_flag) {
storage_access_type = ObStorageAccessType::OB_STORAGE_ACCESS_READER;
} else {
ret = OB_NOT_SUPPORTED;
CLOG_LOG(WARN, "not supported flag", K(open_flag));
}
return ret;
}
int open_io_fd(const ObString &uri,
const OPEN_FLAG open_flag,
ObIODevice *io_device,
ObIOFd &io_fd)
{
int ret = OB_SUCCESS;
ObIODOpt opt;
ObIODOpts iod_opts;
iod_opts.opts_ = &opt;
iod_opts.opt_cnt_ = 1;
// TODO by runlin: support write
ObStorageAccessType access_type;
if (OB_FAIL(convert_to_storage_access_type(open_flag, access_type))) {
CLOG_LOG(WARN, "convert_to_storage_access_type failed", K(open_flag));
} else if (FALSE_IT(opt.set("AccessType", OB_STORAGE_ACCESS_TYPES_STR[access_type]))) {
// flag=-1 and mode=0 are invalid, because ObObjectDevice does not use flag and mode;
} else if (OB_FAIL(io_device->open(uri.ptr(), -1, 0, io_fd, &iod_opts))) {
CLOG_LOG(WARN, "open fd failed", K(uri), K(open_flag));
} else {
CLOG_LOG(TRACE, "open fd success", K(uri), K(open_flag));
}
return ret;
}
int close_io_fd(ObIODevice *io_device,
const ObIOFd &io_fd)
{
int ret = OB_SUCCESS;
if (NULL == io_device || !io_fd.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "io device is empty");
} else if (OB_FAIL(io_device->close(io_fd))) {
CLOG_LOG(WARN, "fail to close fd!", K(io_fd));
} else {
CLOG_LOG(TRACE, "close_io_fd success", KP(io_device), K(io_fd));
}
return ret;
}
ObLogExternalStorageIOTaskHandleAdapter::ObLogExternalStorageIOTaskHandleAdapter() {}
ObLogExternalStorageIOTaskHandleAdapter::~ObLogExternalStorageIOTaskHandleAdapter() {}
int ObLogExternalStorageIOTaskHandleAdapter::exist(const ObString &uri,
const ObString &storage_info,
bool &exist)
{
int ret = OB_SUCCESS;
exist = false;
ObIODevice *io_device = NULL;
if (OB_FAIL(get_and_init_io_device(uri, storage_info, io_device))) {
CLOG_LOG(WARN, "get_io_device failed", K(uri), K(storage_info), KP(io_device));
} else if (OB_FAIL(io_device->exist(uri.ptr(), exist))) {
CLOG_LOG(WARN, "exist failed", K(uri), K(storage_info), KP(io_device), K(exist));
} else {
CLOG_LOG(TRACE, "exist success", K(uri), K(storage_info), KP(io_device), K(exist));
}
release_io_device(io_device);
return ret;
}
int ObLogExternalStorageIOTaskHandleAdapter::get_file_size(const ObString &uri,
const ObString &storage_info,
int64_t &file_size)
{
int ret = OB_SUCCESS;
file_size = 0;
ObIODFileStat file_stat;
ObIODevice *io_device = NULL;
if (OB_FAIL(get_and_init_io_device(uri, storage_info, io_device))) {
CLOG_LOG(WARN, "get_io_device failed", K(uri), K(storage_info), KP(io_device));
} else if (OB_FAIL(io_device->stat(uri.ptr(), file_stat))) {
CLOG_LOG(WARN, "stat io deveice failed", K(uri));
} else {
file_size = file_stat.size_;
CLOG_LOG(TRACE, "get_file_size success", K(uri), K(storage_info), KP(io_device), K(file_size));
}
release_io_device(io_device);
return ret;
}
int ObLogExternalStorageIOTaskHandleAdapter::pread(const ObString &uri,
const ObString &storage_info,
const int64_t offset,
char *buf,
const int64_t read_buf_size,
int64_t &real_read_size)
{
ObTimeGuard time_guard("oss pread", 200 * 1000);
int ret = OB_SUCCESS;
real_read_size = 0;
ObIODevice *io_device = NULL;
ObIOFd io_fd;
if (OB_FAIL(get_and_init_io_device(uri, storage_info, io_device))) {
CLOG_LOG(WARN, "get_io_device failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));
} else if (FALSE_IT(time_guard.click("after get_io_device"))) {
} else if (OB_FAIL(open_io_fd(uri, OPEN_FLAG::READ_FLAG, io_device, io_fd))) {
} else if (FALSE_IT(time_guard.click("after open_io_fd"))) {
CLOG_LOG(WARN, "open_io_fd failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));
} else if (OB_FAIL(io_device->pread(io_fd, offset, read_buf_size, buf, real_read_size))) {
CLOG_LOG(WARN, "pread failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));
} else if (FALSE_IT(time_guard.click("after pread"))) {
} else {
CLOG_LOG(TRACE, "pread success", K(time_guard), K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));
}
(void)close_io_fd(io_device, io_fd);
(void)release_io_device(io_device);
return ret;
}
ObLogExternalStorageIOTask::ObLogExternalStorageIOTask(const ObString &uri,
const ObString &storage_info,
RunningStatus *running_status,
ObLogExternalStorageIOTaskCtx *io_task_ctx,
ObLogExternalStorageIOTaskHandleIAdapter *adapter)
: uri_ob_str_(OB_MAX_URI_LENGTH, 0, uri_str_),
storage_info_ob_str_(OB_MAX_BACKUP_STORAGE_INFO_LENGTH, 0, storage_info_str_),
generate_ts_(OB_INVALID_TIMESTAMP),
do_task_ts_(OB_INVALID_TIMESTAMP),
running_status_(running_status),
io_task_ctx_(io_task_ctx),
type_(ObLogExternalStorageIOTaskType::INVALID_TYPE),
adapter_(adapter)
{
memset(uri_str_, '\0', OB_MAX_URI_LENGTH);
memset(storage_info_str_, '\0', OB_MAX_BACKUP_STORAGE_INFO_LENGTH);
uri_ob_str_.write(uri.ptr(), uri.length());
storage_info_ob_str_.write(storage_info.ptr(), storage_info.length());
generate_ts_ = ObTimeUtility::current_time();
}
ObLogExternalStorageIOTask::~ObLogExternalStorageIOTask()
{
memset(uri_str_, '\0', OB_MAX_URI_LENGTH);
memset(storage_info_str_, '\0', OB_MAX_BACKUP_STORAGE_INFO_LENGTH);
uri_ob_str_.reset();
storage_info_ob_str_.reset();
generate_ts_ = OB_INVALID_TIMESTAMP;
do_task_ts_ = OB_INVALID_TIMESTAMP;
running_status_ = NULL;
io_task_ctx_ = NULL;
type_ = ObLogExternalStorageIOTaskType::INVALID_TYPE;
adapter_ = NULL;
}
int ObLogExternalStorageIOTask::do_task()
{
int ret = OB_SUCCESS;
do_task_ts_ = ObTimeUtility::current_time();
running_status_->thread_id_ = gettid();
running_status_->status_ = EnumRunningStatus::START_STATUS;
int64_t io_delay = do_task_ts_ - generate_ts_;
if (io_delay >= 500 * 1000) {
CLOG_LOG(WARN, "[io delay] handle io task delay too much", K(io_delay), KPC(this));
}
if (OB_FAIL(do_task_())) {
CLOG_LOG(WARN, "do_task_ failed", KP(this));
}
return ret;
}
ObLogExternalStoragePreadTask::ObLogExternalStoragePreadTask(const ObString &uri,
const ObString &storage_info,
RunningStatus *running_status,
ObLogExternalStorageIOTaskCtx *io_task_ctx,
ObLogExternalStorageIOTaskHandleIAdapter *adapter,
const int64_t offset,
const int64_t read_buf_size,
char *read_buf,
int64_t &real_read_size)
: ObLogExternalStorageIOTask(uri, storage_info, running_status, io_task_ctx, adapter),
offset_(offset),
read_buf_size_(read_buf_size),
read_buf_(read_buf),
real_read_size_(real_read_size)
{
type_ = ObLogExternalStorageIOTaskType::PREAD_TYPE;
}
ObLogExternalStoragePreadTask::~ObLogExternalStoragePreadTask()
{
offset_ = -1;
read_buf_size_ = -1;
read_buf_ = NULL;
}
int ObLogExternalStoragePreadTask::do_task_()
{
int ret = OB_SUCCESS;
int64_t tmp_real_read_size = 0;
if (OB_FAIL(adapter_->pread(uri_ob_str_, storage_info_ob_str_, offset_,
read_buf_, read_buf_size_, tmp_real_read_size))) {
CLOG_LOG(WARN, "pread failed", KPC(this));
} else {
ATOMIC_AAF(&real_read_size_, tmp_real_read_size);
}
running_status_->ret_ = ret;
running_status_->status_ = EnumRunningStatus::FINSHED_STATUS;
CLOG_LOG(TRACE, "finished pread", KPC(this), K(tmp_real_read_size));
// NB: don't use io_task_ctx_ or KPC(this) after signal.
// consider used shared ptr to manage memory release of ObLogExternalStorageIOTaskCtx
io_task_ctx_->signal();
return ret;
}
} // end namespace logservice
} // end namespace oceanbase

View File

@ -0,0 +1,205 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_IO_TASK_H_
#define OCEABASE_LOGSERVICE_OB_LOG_EXTERNAL_STORAGE_IO_TASK_H_
#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN
#include "lib/utility/ob_print_utils.h" // TO_STRING_KV
#include "lib//ob_define.h" // OB_MAX_URI_LENGTH
#include "lib/lock/ob_thread_cond.h" // ObThreadCond
#include "share/backup/ob_backup_struct.h" // OB_MAX_BACKUP_STORAGE_INFO_LENGTH
namespace oceanbase
{
namespace common
{
class ObString;
}
namespace logservice
{
enum class EnumRunningStatus {
INVALID_STATUS = 0,
START_STATUS = 1,
FINSHED_STATUS = 2,
MAX_STATUS = 3
};
struct RunningStatus
{
RunningStatus();
~RunningStatus();
void reset();
int ret_;
// For debug
int64_t main_thread_id_;
int64_t thread_id_;
int64_t logical_thread_id_;
EnumRunningStatus status_;
TO_STRING_KV(KR(ret_), K_(main_thread_id), K_(thread_id), K_(logical_thread_id), K_(status));
};
class ObLogExternalStorageIOTaskCtx {
public:
ObLogExternalStorageIOTaskCtx();
~ObLogExternalStorageIOTaskCtx();
public:
int init(const int64_t total_task_count);
void destroy();
// @brief: called by the executer of ObLogExternalStorageIOTask after it has finished.
void signal();
// @brief: wait up to timeout_us until there is no flying task.
// @return value
// OB_SUCCESS
// OB_TIMEOUT, timeout
// others, system error
int wait(int64_t timeout_us);
int get_running_status(const int64_t idx,
RunningStatus *&running_stats) const;
int get_ret_code() const;
bool has_flying_async_task() const;
DECLARE_TO_STRING;
private:
int construct_running_status_(const int64_t total_task_count);
void deconstruct_running_status_();
private:
int64_t flying_task_count_;
int64_t total_task_count_;
mutable ObThreadCond condition_;
RunningStatus *running_status_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(ObLogExternalStorageIOTaskCtx);
};
enum class ObLogExternalStorageIOTaskType {
INVALID_TYPE = 0,
PREAD_TYPE = 1,
MAX_TYPE = 2
};
class ObLogExternalStorageIOTaskHandleIAdapter {
public:
ObLogExternalStorageIOTaskHandleIAdapter() {}
virtual ~ObLogExternalStorageIOTaskHandleIAdapter() {}
// Interface
public:
virtual int exist(const common::ObString &uri,
const common::ObString &storage_info,
bool &exist) = 0;
virtual int get_file_size(const common::ObString &uri,
const common::ObString &storage_info,
int64_t &file_size) = 0;
virtual int pread(const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *buf,
const int64_t read_buf_size,
int64_t &real_read_size) = 0;
};
class ObLogExternalStorageIOTaskHandleAdapter : public ObLogExternalStorageIOTaskHandleIAdapter {
public:
ObLogExternalStorageIOTaskHandleAdapter();
~ObLogExternalStorageIOTaskHandleAdapter() override;
// Implement
public:
int exist(const common::ObString &uri,
const common::ObString &storage_info,
bool &exist) override final;
int get_file_size(const common::ObString &uri,
const common::ObString &storage_info,
int64_t &file_size) override final;
int pread(const common::ObString &uri,
const common::ObString &storage_info,
const int64_t offset,
char *buf,
const int64_t read_buf_size,
int64_t &real_read_size) override final;
};
class ObLogExternalStorageIOTask {
public:
ObLogExternalStorageIOTask(const ObString &uri,
const ObString &storage_info,
RunningStatus *running_status,
ObLogExternalStorageIOTaskCtx *io_task_ctx,
ObLogExternalStorageIOTaskHandleIAdapter *adapter);
virtual ~ObLogExternalStorageIOTask();
public:
int do_task();
VIRTUAL_TO_STRING_KV("BaseClass", "ObLogExternalStorageIOTask",
"uri", uri_ob_str_,
"storage_info", storage_info_ob_str_,
"generate_ts", generate_ts_,
"do_task_ts", do_task_ts_,
"type", type_,
KP(running_status_),
KPC(io_task_ctx_));
private:
virtual int do_task_() = 0;
protected:
ObString uri_ob_str_;
char uri_str_[OB_MAX_URI_LENGTH];
ObString storage_info_ob_str_;
char storage_info_str_[share::OB_MAX_BACKUP_STORAGE_INFO_LENGTH];
int64_t generate_ts_;
int64_t do_task_ts_;
RunningStatus *running_status_;
ObLogExternalStorageIOTaskCtx *io_task_ctx_;
ObLogExternalStorageIOTaskType type_;
ObLogExternalStorageIOTaskHandleIAdapter *adapter_;
DISALLOW_COPY_AND_ASSIGN(ObLogExternalStorageIOTask);
};
class ObLogExternalStoragePreadTask : public ObLogExternalStorageIOTask {
public:
ObLogExternalStoragePreadTask(const ObString &uri,
const ObString &storage_info,
RunningStatus *running_status,
ObLogExternalStorageIOTaskCtx *io_task_ctx,
ObLogExternalStorageIOTaskHandleIAdapter *adapter,
const int64_t offset,
const int64_t read_buf_size,
char *read_buf,
int64_t &real_read_size);
~ObLogExternalStoragePreadTask() override;
public:
INHERIT_TO_STRING_KV("ObLogExternalStorageTask", ObLogExternalStorageIOTask,
K_(offset), K_(read_buf_size), KP(read_buf_), K_(real_read_size));
private:
int do_task_() override final;
private:
int64_t offset_;
int64_t read_buf_size_;
char *read_buf_;
// shared variable between different threads, need use atomic operation.
int64_t &real_read_size_;
};
} // end namespace logservice
} // end namespace oceanbase
#endif