[PALF] add LogIOContext and LogIOUser
This commit is contained in:
@ -396,16 +396,19 @@ TEST_F(TestObSimpleLogClusterLogEngine, exception_path)
|
|||||||
// 测试目录清空后,读数据是否正常报错
|
// 测试目录清空后,读数据是否正常报错
|
||||||
ReadBufGuard buf_guard("dummy", 100);
|
ReadBufGuard buf_guard("dummy", 100);
|
||||||
int64_t out_read_size;
|
int64_t out_read_size;
|
||||||
|
LogIOContext io_ctx(LogIOUser::DEFAULT);
|
||||||
EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND,
|
EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND,
|
||||||
log_storage->pread(LSN((truncate_prefix_block_id + 1) * PALF_BLOCK_SIZE),
|
log_storage->pread(LSN((truncate_prefix_block_id + 1) * PALF_BLOCK_SIZE),
|
||||||
100,
|
100,
|
||||||
buf_guard.read_buf_,
|
buf_guard.read_buf_,
|
||||||
out_read_size));
|
out_read_size,
|
||||||
|
io_ctx));
|
||||||
EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND,
|
EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND,
|
||||||
log_storage->pread(LSN((truncate_prefix_block_id - 1) * PALF_BLOCK_SIZE),
|
log_storage->pread(LSN((truncate_prefix_block_id - 1) * PALF_BLOCK_SIZE),
|
||||||
100,
|
100,
|
||||||
buf_guard.read_buf_,
|
buf_guard.read_buf_,
|
||||||
out_read_size));
|
out_read_size,
|
||||||
|
io_ctx));
|
||||||
// 测试目录清空后,重启是否正常
|
// 测试目录清空后,重启是否正常
|
||||||
EXPECT_EQ(OB_SUCCESS, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));
|
EXPECT_EQ(OB_SUCCESS, reload(log_engine_->log_storage_.log_tail_, log_engine_->log_meta_storage_.log_tail_, log_engine_->log_meta_.log_snapshot_meta_.base_lsn_));
|
||||||
|
|
||||||
|
|||||||
@ -632,13 +632,14 @@ int LogEngine::read_log(const LSN &lsn,
|
|||||||
int64_t &out_read_size)
|
int64_t &out_read_size)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
LogIOContext io_ctx(LogIOUser::DEFAULT);
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
PALF_LOG(ERROR, "LogEngine not inited!!!", K(ret), K_(palf_id), K_(is_inited));
|
PALF_LOG(ERROR, "LogEngine not inited!!!", K(ret), K_(palf_id), K_(is_inited));
|
||||||
} else if (false == lsn.is_valid() || 0 >= in_read_size || false == read_buf.is_valid()) {
|
} else if (false == lsn.is_valid() || 0 >= in_read_size || false == read_buf.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K_(palf_id), K_(is_inited), K(lsn), K(in_read_size), K(read_buf));
|
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K_(palf_id), K_(is_inited), K(lsn), K(in_read_size), K(read_buf));
|
||||||
} else if (OB_FAIL(log_storage_.pread(lsn, in_read_size, read_buf, out_read_size))) {
|
} else if (OB_FAIL(log_storage_.pread(lsn, in_read_size, read_buf, out_read_size, io_ctx))) {
|
||||||
PALF_LOG(ERROR, "LogEngine read_log failed", K(ret), K(lsn), K(in_read_size), K(read_buf));
|
PALF_LOG(ERROR, "LogEngine read_log failed", K(ret), K(lsn), K(in_read_size), K(read_buf));
|
||||||
} else {
|
} else {
|
||||||
PALF_LOG(TRACE, "LogEngine read_log success", K(ret), K(lsn), K(read_buf), K(out_read_size));
|
PALF_LOG(TRACE, "LogEngine read_log success", K(ret), K(lsn), K(read_buf), K(out_read_size));
|
||||||
@ -1301,12 +1302,13 @@ int LogEngine::construct_log_meta_(const LSN &lsn, block_id_t &expected_next_blo
|
|||||||
ReadBufGuard guard("LogEngine", buf_len);
|
ReadBufGuard guard("LogEngine", buf_len);
|
||||||
ReadBuf &read_buf = guard.read_buf_;
|
ReadBuf &read_buf = guard.read_buf_;
|
||||||
LogMetaEntry meta_entry;
|
LogMetaEntry meta_entry;
|
||||||
|
LogIOContext io_ctx(LogIOUser::RESTART);
|
||||||
if (false == lsn.is_valid()) {
|
if (false == lsn.is_valid()) {
|
||||||
PALF_LOG(INFO, "there is no meta entry, maybe create palf failed", K(ret), K_(palf_id), K_(is_inited));
|
PALF_LOG(INFO, "there is no meta entry, maybe create palf failed", K(ret), K_(palf_id), K_(is_inited));
|
||||||
} else if (!read_buf.is_valid()) {
|
} else if (!read_buf.is_valid()) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
PALF_LOG(WARN, "allocate memory failed", KPC(this), K(lsn));
|
PALF_LOG(WARN, "allocate memory failed", KPC(this), K(lsn));
|
||||||
} else if (OB_FAIL(log_meta_storage_.pread(lsn, buf_len, read_buf, out_read_size))) {
|
} else if (OB_FAIL(log_meta_storage_.pread(lsn, buf_len, read_buf, out_read_size, io_ctx))) {
|
||||||
PALF_LOG(WARN, "ObLogMetaStorage pread failed", K(ret), K_(palf_id), K_(is_inited));
|
PALF_LOG(WARN, "ObLogMetaStorage pread failed", K(ret), K_(palf_id), K_(is_inited));
|
||||||
// NB: when lsn is invalid, means there is no data on disk.
|
// NB: when lsn is invalid, means there is no data on disk.
|
||||||
} else if (OB_FAIL(meta_entry.deserialize(read_buf.buf_, buf_len, pos))) {
|
} else if (OB_FAIL(meta_entry.deserialize(read_buf.buf_, buf_len, pos))) {
|
||||||
|
|||||||
74
src/logservice/palf/log_io_context.h
Normal file
74
src/logservice/palf/log_io_context.h
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2021 OceanBase
|
||||||
|
* OceanBase CE is licensed under Mulan PubL v2.
|
||||||
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||||
|
* You may obtain a copy of Mulan PubL v2 at:
|
||||||
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||||
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||||
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||||
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||||
|
* See the Mulan PubL v2 for more details.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef OCEANBASE_LOGSERVICE_LOG_IO_CONTEXT_
|
||||||
|
#define OCEANBASE_LOGSERVICE_LOG_IO_CONTEXT_
|
||||||
|
#include <cstdint>
|
||||||
|
#include "lib/utility/ob_print_utils.h"
|
||||||
|
|
||||||
|
namespace oceanbase
|
||||||
|
{
|
||||||
|
namespace palf
|
||||||
|
{
|
||||||
|
enum class LogIOUser {
|
||||||
|
DEFAULT = 0,
|
||||||
|
REPLAY = 1,
|
||||||
|
FETCHLOG = 2,
|
||||||
|
ARCHIVE = 3,
|
||||||
|
RESTORE = 4,
|
||||||
|
CDC = 5,
|
||||||
|
STANDBY = 6,
|
||||||
|
SHARED_UPLOAD = 7,
|
||||||
|
META_INFO = 8,
|
||||||
|
RESTART = 9,
|
||||||
|
OTHER = 10,
|
||||||
|
};
|
||||||
|
|
||||||
|
inline const char *log_io_user_str(const LogIOUser user_type)
|
||||||
|
{
|
||||||
|
#define USER_TYPE_STR(x) case(LogIOUser::x): return #x
|
||||||
|
switch (user_type)
|
||||||
|
{
|
||||||
|
USER_TYPE_STR(DEFAULT);
|
||||||
|
USER_TYPE_STR(REPLAY);
|
||||||
|
USER_TYPE_STR(FETCHLOG);
|
||||||
|
USER_TYPE_STR(ARCHIVE);
|
||||||
|
USER_TYPE_STR(RESTORE);
|
||||||
|
USER_TYPE_STR(CDC);
|
||||||
|
USER_TYPE_STR(STANDBY);
|
||||||
|
USER_TYPE_STR(SHARED_UPLOAD);
|
||||||
|
USER_TYPE_STR(META_INFO);
|
||||||
|
USER_TYPE_STR(RESTART);
|
||||||
|
USER_TYPE_STR(OTHER);
|
||||||
|
default:
|
||||||
|
return "Invalid";
|
||||||
|
}
|
||||||
|
#undef USER_TYPE_STR
|
||||||
|
}
|
||||||
|
|
||||||
|
class LogIOContext
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
LogIOContext(const LogIOUser &user) : user_(user) { }
|
||||||
|
~LogIOContext() { destroy(); }
|
||||||
|
void destroy()
|
||||||
|
{
|
||||||
|
user_ = LogIOUser::DEFAULT;
|
||||||
|
}
|
||||||
|
TO_STRING_KV("user", log_io_user_str(user_));
|
||||||
|
|
||||||
|
private:
|
||||||
|
LogIOUser user_;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
@ -163,9 +163,10 @@ int MemoryStorage::append(const char *buf, const int64_t buf_len)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int MemoryStorage::pread(const LSN &lsn, const int64_t in_read_size, ReadBuf &read_buf, int64_t &out_read_size)
|
int MemoryStorage::pread(const LSN &lsn, const int64_t in_read_size, ReadBuf &read_buf, int64_t &out_read_size, LogIOContext &io_ctx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
UNUSED(io_ctx);
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
} else if (false == lsn.is_valid() || 0 >= in_read_size) {
|
} else if (false == lsn.is_valid() || 0 >= in_read_size) {
|
||||||
@ -202,7 +203,8 @@ int MemoryIteratorStorage::read_data_from_storage_(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const LSN start_lsn = start_lsn_ + pos;
|
const LSN start_lsn = start_lsn_ + pos;
|
||||||
if (OB_FAIL(log_storage_->pread(start_lsn, in_read_size, read_buf_, out_read_size))) {
|
LogIOContext io_ctx(LogIOUser::DEFAULT);
|
||||||
|
if (OB_FAIL(log_storage_->pread(start_lsn, in_read_size, read_buf_, out_read_size, io_ctx))) {
|
||||||
PALF_LOG(WARN, "MemoryIteratorStorage pread failed", K(ret), KPC(this), K(start_lsn));
|
PALF_LOG(WARN, "MemoryIteratorStorage pread failed", K(ret), KPC(this), K(start_lsn));
|
||||||
} else {
|
} else {
|
||||||
PALF_LOG(TRACE, "MemoryIteratorStorage read_data_from_storage_ success", K(ret), KPC(this), K(start_lsn));
|
PALF_LOG(TRACE, "MemoryIteratorStorage read_data_from_storage_ success", K(ret), KPC(this), K(start_lsn));
|
||||||
@ -237,12 +239,13 @@ int DiskIteratorStorage::read_data_from_storage_(
|
|||||||
const LSN curr_round_read_lsn = start_lsn_ + pos + remain_valid_data_size;
|
const LSN curr_round_read_lsn = start_lsn_ + pos + remain_valid_data_size;
|
||||||
const int64_t real_in_read_size = in_read_size - remain_valid_data_size;
|
const int64_t real_in_read_size = in_read_size - remain_valid_data_size;
|
||||||
read_buf_.buf_ += remain_valid_data_size;
|
read_buf_.buf_ += remain_valid_data_size;
|
||||||
|
LogIOContext io_ctx(LogIOUser::DEFAULT);
|
||||||
if (0ul == real_in_read_size) {
|
if (0ul == real_in_read_size) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
PALF_LOG(ERROR, "real read size is zero, unexpected error!!!", K(ret), K(real_in_read_size));
|
PALF_LOG(ERROR, "real read size is zero, unexpected error!!!", K(ret), K(real_in_read_size));
|
||||||
} else if (OB_FAIL(log_storage_->pread(curr_round_read_lsn,
|
} else if (OB_FAIL(log_storage_->pread(curr_round_read_lsn,
|
||||||
real_in_read_size,
|
real_in_read_size,
|
||||||
read_buf_, out_read_size))) {
|
read_buf_, out_read_size, io_ctx))) {
|
||||||
PALF_LOG(WARN, "ILogStorage pread failed", K(ret), K(pos), K(in_read_size), KPC(this));
|
PALF_LOG(WARN, "ILogStorage pread failed", K(ret), K(pos), K(in_read_size), KPC(this));
|
||||||
}
|
}
|
||||||
read_buf_.buf_ -= remain_valid_data_size;
|
read_buf_.buf_ -= remain_valid_data_size;
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
#include "lib/utility/ob_print_utils.h"
|
#include "lib/utility/ob_print_utils.h"
|
||||||
#include "lib/utility/ob_utility.h"
|
#include "lib/utility/ob_utility.h"
|
||||||
#include "lib/function/ob_function.h" // ObFunction
|
#include "lib/function/ob_function.h" // ObFunction
|
||||||
|
#include "log_io_context.h"
|
||||||
#include "log_storage_interface.h"
|
#include "log_storage_interface.h"
|
||||||
#include "lsn.h"
|
#include "lsn.h"
|
||||||
#include "log_reader_utils.h"
|
#include "log_reader_utils.h"
|
||||||
@ -73,7 +74,7 @@ public:
|
|||||||
void destroy();
|
void destroy();
|
||||||
bool is_inited() const { return is_inited_; }
|
bool is_inited() const { return is_inited_; }
|
||||||
int append(const char *buf, const int64_t buf_len);
|
int append(const char *buf, const int64_t buf_len);
|
||||||
int pread(const LSN& lsn, const int64_t in_read_size, ReadBuf &read_buf, int64_t &out_read_size) final;
|
int pread(const LSN& lsn, const int64_t in_read_size, ReadBuf &read_buf, int64_t &out_read_size, LogIOContext &io_ctx) final;
|
||||||
TO_STRING_KV(K_(start_lsn), K_(log_tail), K_(buf), K_(buf_len), K_(is_inited));
|
TO_STRING_KV(K_(start_lsn), K_(log_tail), K_(buf), K_(buf_len), K_(is_inited));
|
||||||
private:
|
private:
|
||||||
const char *buf_;
|
const char *buf_;
|
||||||
|
|||||||
@ -271,11 +271,15 @@ int LogStorage::append_meta(const char *buf, const int64_t buf_len)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int LogStorage::pread(const LSN &read_lsn, const int64_t in_read_size, ReadBuf &read_buf,
|
int LogStorage::pread(const LSN &read_lsn,
|
||||||
int64_t &out_read_size)
|
const int64_t in_read_size,
|
||||||
|
ReadBuf &read_buf,
|
||||||
|
int64_t &out_read_size,
|
||||||
|
LogIOContext &io_ctx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool need_read_with_block_header = false;
|
bool need_read_with_block_header = false;
|
||||||
|
UNUSED(io_ctx);
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
PALF_LOG(ERROR, "LogStorage not inited!!!", K(ret));
|
PALF_LOG(ERROR, "LogStorage not inited!!!", K(ret));
|
||||||
|
|||||||
@ -88,7 +88,8 @@ public:
|
|||||||
int pread(const LSN &lsn,
|
int pread(const LSN &lsn,
|
||||||
const int64_t in_read_size,
|
const int64_t in_read_size,
|
||||||
ReadBuf &read_buf,
|
ReadBuf &read_buf,
|
||||||
int64_t &out_read_size) final;
|
int64_t &out_read_size,
|
||||||
|
LogIOContext &io_ctx) final;
|
||||||
|
|
||||||
int pread_without_block_header(const LSN &read_lsn,
|
int pread_without_block_header(const LSN &read_lsn,
|
||||||
const int64_t in_read_size,
|
const int64_t in_read_size,
|
||||||
|
|||||||
@ -19,6 +19,7 @@ namespace palf
|
|||||||
{
|
{
|
||||||
class ReadBuf;
|
class ReadBuf;
|
||||||
class LSN;
|
class LSN;
|
||||||
|
class LogIOContext;
|
||||||
class ILogStorage
|
class ILogStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -31,7 +32,8 @@ public:
|
|||||||
virtual int pread(const LSN &lsn,
|
virtual int pread(const LSN &lsn,
|
||||||
const int64_t in_read_size,
|
const int64_t in_read_size,
|
||||||
ReadBuf &read_buf,
|
ReadBuf &read_buf,
|
||||||
int64_t &out_read_size) = 0;
|
int64_t &out_read_size,
|
||||||
|
LogIOContext &io_ctx) = 0;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user