Fix direct load merge stuck.

This commit is contained in:
godchen0212
2023-06-22 01:24:27 +00:00
committed by ob-robot
parent e2c9be5adc
commit ce443c7fce
2 changed files with 55 additions and 49 deletions

View File

@ -62,9 +62,8 @@ ObTmpFileIOHandle::ObTmpFileIOHandle()
io_handles_(),
page_cache_handles_(),
block_cache_handles_(),
last_extent_id_(0),
last_extent_min_offset_(0),
last_extent_max_offset_(INT64_MAX)
last_fd_(OB_INVALID_FD),
last_extent_id_(0)
{
}
@ -95,6 +94,10 @@ int ObTmpFileIOHandle::prepare_read(
expect_read_size_ = read_size;
last_read_offset_ = read_offset;
io_flag_ = io_flag;
if (last_fd_ != fd_) {
last_fd_ = fd_;
last_extent_id_ = 0;
}
}
return ret;
}
@ -243,24 +246,14 @@ bool ObTmpFileIOHandle::is_valid()
&& NULL != buf_ && size_ >= 0;
}
void ObTmpFileIOHandle::update_extent_idx_cache(const int64_t last_extent_id,
const int64_t last_extent_min_offset,
const int64_t last_extent_max_offset)
void ObTmpFileIOHandle::set_last_extent_id(const int64_t last_extent_id)
{
last_extent_id_ = last_extent_id;
last_extent_min_offset_ = last_extent_min_offset;
last_extent_max_offset_ = last_extent_max_offset;
}
int64_t ObTmpFileIOHandle::get_extent_idx_from_cache(const int64_t offset) const
int64_t ObTmpFileIOHandle::get_last_extent_id() const
{
int64_t ith_extent = -1;
if (offset >= last_extent_min_offset_ && offset < last_extent_max_offset_) {
ith_extent = last_extent_id_;
} else if (offset == last_extent_max_offset_) {
ith_extent = last_extent_id_ + 1;
}
return ith_extent;
return last_extent_id_;
}
ObTmpFileIOHandle::ObIOReadHandle::ObIOReadHandle()
@ -723,25 +716,28 @@ int ObTmpFile::once_aio_read_batch(
ObTmpFileExtent *tmp = nullptr;
const int64_t remain_size = io_info.size_ - handle.get_data_size();
SpinWLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpFile has not been initialized", K(ret));
} else if (OB_UNLIKELY(offset < 0 || remain_size < 0) || OB_ISNULL(io_info.buf_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(remain_size), KP(io_info.buf_));
} else if (OB_ISNULL(tmp = file_meta_.get_last_extent())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected error, null tmp file extent", K(ret), KP(tmp), K(io_info));
} else if (OB_UNLIKELY(remain_size > 0 && offset >= tmp->get_global_end())) {
ret = OB_ITER_END;
} else if (OB_FAIL(once_aio_read_batch_without_lock(io_info, offset, handle))) {
STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle));
} else {
handle.set_last_read_offset(offset);
{
SpinRLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpFile has not been initialized", K(ret));
} else if (OB_UNLIKELY(offset < 0 || remain_size < 0) || OB_ISNULL(io_info.buf_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(remain_size), KP(io_info.buf_));
} else if (OB_ISNULL(tmp = file_meta_.get_last_extent())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected error, null tmp file extent", K(ret), KP(tmp), K(io_info));
} else if (OB_UNLIKELY(remain_size > 0 && offset >= tmp->get_global_end())) {
ret = OB_ITER_END;
} else if (OB_FAIL(once_aio_read_batch_without_lock(io_info, offset, handle))) {
STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle));
} else {
handle.set_last_read_offset(offset);
}
}
if (need_update_offset) {
SpinWLockGuard guard(lock_);
offset_ = offset;
}
return ret;
@ -759,11 +755,7 @@ int ObTmpFile::once_aio_read_batch_without_lock(
int64_t read_size = 0;
ObTmpFileExtent *tmp = nullptr;
common::ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
int64_t ith_extent = handle.get_extent_idx_from_cache(offset);
if (OB_UNLIKELY(-1 == ith_extent)) {
ith_extent = find_first_extent(offset);
}
int64_t ith_extent = get_extent_cache(offset, handle);
while (OB_SUCC(ret)
&& ith_extent < extents.count()
@ -790,12 +782,32 @@ int ObTmpFile::once_aio_read_batch_without_lock(
++ith_extent;
}
if (OB_SUCC(ret) && OB_NOT_NULL(tmp) && OB_LIKELY(ith_extent > 0)) {
handle.update_extent_idx_cache(ith_extent - 1, tmp->get_global_start(), tmp->get_global_end());
if (OB_SUCC(ret) && OB_LIKELY(ith_extent > 0)) {
handle.set_last_extent_id(ith_extent - 1);
}
return ret;
}
int64_t ObTmpFile::get_extent_cache(const int64_t offset, const ObTmpFileIOHandle &handle)
{
common::ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
int64_t ith_extent = -1;
int64_t last_extent_id = handle.get_last_extent_id();
if (OB_UNLIKELY(last_extent_id < 0 || last_extent_id >= extents.count() - 1)) {
ith_extent = find_first_extent(offset);
} else if (OB_LIKELY(extents.at(last_extent_id)->get_global_start() <= offset
&& offset < extents.at(last_extent_id)->get_global_end())) {
ith_extent = last_extent_id;
} else if (offset == extents.at(last_extent_id)->get_global_end()
&& last_extent_id != extents.count() - 1) {
ith_extent = last_extent_id + 1;
} else {
ith_extent = find_first_extent(offset);
}
return ith_extent;
}
int ObTmpFile::aio_read(const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &handle)
{
int ret = OB_SUCCESS;