dec the sizeof(Obsharedblockreadhandle)
This commit is contained in:
@ -246,9 +246,13 @@ bool ObSharedBlockWriteHandle::is_valid() const
|
|||||||
|
|
||||||
bool ObSharedBlockReadHandle::is_valid() const
|
bool ObSharedBlockReadHandle::is_valid() const
|
||||||
{
|
{
|
||||||
return macro_handles_.count() > 0 && addrs_.count() == 1;
|
return macro_handle_.is_valid();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObSharedBlockReadHandle::is_empty() const
|
||||||
|
{
|
||||||
|
return macro_handle_.is_empty();
|
||||||
|
}
|
||||||
ObSharedBlockReadHandle::ObSharedBlockReadHandle(const ObSharedBlockReadHandle &other)
|
ObSharedBlockReadHandle::ObSharedBlockReadHandle(const ObSharedBlockReadHandle &other)
|
||||||
{
|
{
|
||||||
*this = other;
|
*this = other;
|
||||||
@ -257,18 +261,21 @@ ObSharedBlockReadHandle::ObSharedBlockReadHandle(const ObSharedBlockReadHandle &
|
|||||||
ObSharedBlockReadHandle &ObSharedBlockReadHandle::operator=(const ObSharedBlockReadHandle &other)
|
ObSharedBlockReadHandle &ObSharedBlockReadHandle::operator=(const ObSharedBlockReadHandle &other)
|
||||||
{
|
{
|
||||||
if (&other != this) {
|
if (&other != this) {
|
||||||
addrs_ = other.addrs_;
|
macro_handle_ = other.macro_handle_;
|
||||||
macro_handles_ = other.macro_handles_;
|
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObSharedBlockReadHandle::wait()
|
int ObSharedBlockReadHandle::wait(const int64_t timeout_ms)
|
||||||
{
|
{
|
||||||
// TODO(zhuixin.gsy) use timeout_ms to wait
|
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(ObSharedBlockBaseHandle::wait())) {
|
const int64_t io_timeout_ms = timeout_ms > 0
|
||||||
LOG_WARN("Fail to wait io finish", K(ret));
|
? timeout_ms : MAX(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS);
|
||||||
|
if (OB_UNLIKELY(!macro_handle_.is_valid())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected read handle", K(ret), K_(macro_handle));
|
||||||
|
} else if (OB_FAIL(macro_handle_.wait(io_timeout_ms))) {
|
||||||
|
LOG_WARN("Failt to wait macro handle finish", K(ret), K(macro_handle_), K(io_timeout_ms));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -276,12 +283,11 @@ int ObSharedBlockReadHandle::wait()
|
|||||||
int ObSharedBlockReadHandle::get_data(ObIAllocator &allocator, char *&buf, int64_t &buf_len)
|
int ObSharedBlockReadHandle::get_data(ObIAllocator &allocator, char *&buf, int64_t &buf_len)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(ObSharedBlockBaseHandle::wait())) {
|
if (OB_FAIL(wait())) {
|
||||||
LOG_WARN("Fail to wait io finish", K(ret));
|
LOG_WARN("Fail to wait io finish", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ObMacroBlockHandle ¯o_handle = macro_handles_.at(0);
|
const char *data_buf = macro_handle_.get_buffer();
|
||||||
const char *data_buf = macro_handle.get_buffer();
|
const int64_t data_size = macro_handle_.get_data_size();
|
||||||
const int64_t data_size = macro_handle.get_data_size();
|
|
||||||
int64_t header_size = 0;
|
int64_t header_size = 0;
|
||||||
if (OB_FAIL(verify_checksum(data_buf, data_size, header_size, buf_len))) {
|
if (OB_FAIL(verify_checksum(data_buf, data_size, header_size, buf_len))) {
|
||||||
LOG_WARN("fail to verify checksum", K(ret), KP(data_buf), K(data_size), K(header_size), K(buf_len));
|
LOG_WARN("fail to verify checksum", K(ret), KP(data_buf), K(data_size), K(header_size), K(buf_len));
|
||||||
@ -341,6 +347,18 @@ int ObSharedBlockReadHandle::verify_checksum(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObSharedBlockReadHandle::set_macro_handle(const ObMacroBlockHandle ¯o_handle)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_UNLIKELY(!macro_handle.is_valid())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid args", K(ret), K(macro_handle));
|
||||||
|
} else {
|
||||||
|
macro_handle_ = macro_handle;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObSharedBlockWriteHandle::get_write_ctx(ObSharedBlocksWriteCtx &write_ctx)
|
int ObSharedBlockWriteHandle::get_write_ctx(ObSharedBlocksWriteCtx &write_ctx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -455,7 +473,7 @@ int ObSharedBlockLinkIter::get_next(ObIAllocator &allocator, char *&buf, int64_t
|
|||||||
} else if (OB_FAIL(block_handle.get_data(allocator, buf, buf_len))) {
|
} else if (OB_FAIL(block_handle.get_data(allocator, buf, buf_len))) {
|
||||||
LOG_WARN("Fail to get data", K(ret), K(block_handle));
|
LOG_WARN("Fail to get data", K(ret), K(block_handle));
|
||||||
} else {
|
} else {
|
||||||
ObMacroBlockHandle ¯o_handle = block_handle.macro_handles_.at(0);
|
ObMacroBlockHandle ¯o_handle = block_handle.macro_handle_;
|
||||||
const ObSharedBlockHeader *header =
|
const ObSharedBlockHeader *header =
|
||||||
reinterpret_cast<const ObSharedBlockHeader *>(macro_handle.get_buffer());
|
reinterpret_cast<const ObSharedBlockHeader *>(macro_handle.get_buffer());
|
||||||
cur_ = header->prev_addr_;
|
cur_ = header->prev_addr_;
|
||||||
@ -856,10 +874,8 @@ int ObSharedBlockReaderWriter::async_read(
|
|||||||
LOG_WARN("Fail to get block addr", K(ret), K(read_info));
|
LOG_WARN("Fail to get block addr", K(ret), K(read_info));
|
||||||
} else if (OB_FAIL(macro_handle.async_read(macro_read_info))) {
|
} else if (OB_FAIL(macro_handle.async_read(macro_read_info))) {
|
||||||
LOG_WARN("Fail to async read block", K(ret), K(macro_read_info));
|
LOG_WARN("Fail to async read block", K(ret), K(macro_read_info));
|
||||||
} else if (OB_FAIL(block_handle.add_macro_handle(macro_handle))) {
|
} else if (OB_FAIL(block_handle.set_macro_handle(macro_handle))) {
|
||||||
LOG_WARN("Fail to add macro handle", K(ret), K(macro_read_info));
|
LOG_WARN("Fail to add macro handle", K(ret), K(macro_read_info));
|
||||||
} else if (OB_FAIL(block_handle.add_meta_addr(read_info.addr_))) {
|
|
||||||
LOG_WARN("Fail to add meta addr", K(ret));
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -127,18 +127,21 @@ protected:
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
class ObSharedBlockReadHandle final : public ObSharedBlockBaseHandle
|
class ObSharedBlockReadHandle final
|
||||||
{
|
{
|
||||||
friend class ObSharedBlockReaderWriter;
|
friend class ObSharedBlockReaderWriter;
|
||||||
|
friend class ObSharedBlockLinkIter;
|
||||||
public:
|
public:
|
||||||
ObSharedBlockReadHandle() = default;
|
ObSharedBlockReadHandle() = default;
|
||||||
~ObSharedBlockReadHandle() = default;
|
~ObSharedBlockReadHandle() = default;
|
||||||
ObSharedBlockReadHandle(const ObSharedBlockReadHandle &other);
|
ObSharedBlockReadHandle(const ObSharedBlockReadHandle &other);
|
||||||
ObSharedBlockReadHandle &operator=(const ObSharedBlockReadHandle &other);
|
ObSharedBlockReadHandle &operator=(const ObSharedBlockReadHandle &other);
|
||||||
bool is_valid() const;
|
bool is_valid() const;
|
||||||
int wait();
|
bool is_empty() const;
|
||||||
|
int wait(const int64_t timeout_ms = -1);
|
||||||
int get_data(ObIAllocator &allocator, char *&buf, int64_t &buf_len);
|
int get_data(ObIAllocator &allocator, char *&buf, int64_t &buf_len);
|
||||||
|
void reset() { macro_handle_.reset(); }
|
||||||
|
TO_STRING_KV(K_(macro_handle));
|
||||||
public:
|
public:
|
||||||
static int parse_data(
|
static int parse_data(
|
||||||
const char *data_buf,
|
const char *data_buf,
|
||||||
@ -152,6 +155,9 @@ private:
|
|||||||
const int64_t data_size,
|
const int64_t data_size,
|
||||||
int64_t &header_size,
|
int64_t &header_size,
|
||||||
int64_t &buf_len);
|
int64_t &buf_len);
|
||||||
|
int set_macro_handle(const blocksstable::ObMacroBlockHandle ¯o_handle);
|
||||||
|
private:
|
||||||
|
blocksstable::ObMacroBlockHandle macro_handle_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObSharedBlockWriteHandle final : public ObSharedBlockBaseHandle
|
class ObSharedBlockWriteHandle final : public ObSharedBlockBaseHandle
|
||||||
|
@ -425,7 +425,7 @@ ObStorageMetaHandle::~ObStorageMetaHandle()
|
|||||||
int ObStorageMetaHandle::get_value(const ObStorageMetaValue *&value)
|
int ObStorageMetaHandle::get_value(const ObStorageMetaValue *&value)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(wait(GCONF._data_storage_io_timeout / 1000L))) {
|
if (!io_handle_.is_empty() && OB_FAIL(wait(GCONF._data_storage_io_timeout / 1000L))) { /*wait if not hit cache*/
|
||||||
LOG_WARN("fail to wait", K(ret), KPC(this));
|
LOG_WARN("fail to wait", K(ret), KPC(this));
|
||||||
} else {
|
} else {
|
||||||
value = cache_handle_.get_cache_value()->value_;
|
value = cache_handle_.get_cache_value()->value_;
|
||||||
@ -463,12 +463,10 @@ bool ObStorageMetaHandle::is_valid() const
|
|||||||
int ObStorageMetaHandle::wait(const int64_t timeout_ms)
|
int ObStorageMetaHandle::wait(const int64_t timeout_ms)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
UNUSED(timeout_ms);
|
|
||||||
// TODO(zhuixin.gsy) use timeout_ms to wait
|
|
||||||
if (OB_UNLIKELY(!phy_addr_.is_block())) {
|
if (OB_UNLIKELY(!phy_addr_.is_block())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected meta address", K(ret), K_(phy_addr));
|
LOG_WARN("unexpected meta address", K(ret), K_(phy_addr));
|
||||||
} else if (OB_FAIL(io_handle_.wait())) {
|
} else if (OB_FAIL(io_handle_.wait(timeout_ms))) {
|
||||||
LOG_WARN("fail to wait io handle", K(ret), K(io_handle_));
|
LOG_WARN("fail to wait io handle", K(ret), K(io_handle_));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -203,7 +203,7 @@ int ObFullTabletCreator::persist_tablet()
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_ERROR("unexpected not memory tablet addr", K(ret), K(key), K(addr), K(old_handle), K(old_tablet->is_empty_shell()));
|
LOG_ERROR("unexpected not memory tablet addr", K(ret), K(key), K(addr), K(old_handle), K(old_tablet->is_empty_shell()));
|
||||||
} else if (addr != old_addr) {
|
} else if (addr != old_addr) {
|
||||||
if (addr.is_block()) {
|
if (addr.is_disked()) {
|
||||||
LOG_INFO("full tablet has been persisted, skip this", K(ret), K(key), K(old_addr), K(addr));
|
LOG_INFO("full tablet has been persisted, skip this", K(ret), K(key), K(old_addr), K(addr));
|
||||||
} else {
|
} else {
|
||||||
ret = OB_NOT_THE_OBJECT; // create_memtable may change the addr, push back to queue
|
ret = OB_NOT_THE_OBJECT; // create_memtable may change the addr, push back to queue
|
||||||
|
Reference in New Issue
Block a user