[CP] [CP] [TMP.FILE] support single read huge tmp file.

This commit is contained in:
Tyshawn
2022-07-20 11:31:16 +08:00
committed by wangzelin.wzl
parent cdb3f87700
commit 18019f2bf2
3 changed files with 228 additions and 84 deletions

View File

@ -43,17 +43,27 @@ bool ObTmpFileIOInfo::is_valid() const
}
ObTmpFileIOHandle::ObTmpFileIOHandle()
: tmp_file_(NULL), io_handles_(), page_cache_handles_(), block_cache_handles_(), buf_(NULL),
size_(0), is_read_(false), has_wait_(false)
{
}
: tmp_file_(NULL),
io_handles_(),
page_cache_handles_(),
block_cache_handles_(),
buf_(NULL),
size_(0),
is_read_(false),
has_wait_(false),
expect_read_size_(0),
last_read_offset_(-1),
io_flag_(),
update_offset_in_file_(false)
{}
ObTmpFileIOHandle::~ObTmpFileIOHandle()
{
reset();
}
int ObTmpFileIOHandle::prepare_read(char* read_buf, ObTmpFile* file)
int ObTmpFileIOHandle::prepare_read(const int64_t read_size, const int64_t read_offset, const common::ObIODesc &io_flag,
char *read_buf, ObTmpFile *file)
{
int ret = OB_SUCCESS;
if (NULL == read_buf || NULL == file) {
@ -65,6 +75,9 @@ int ObTmpFileIOHandle::prepare_read(char* read_buf, ObTmpFile* file)
tmp_file_ = file;
is_read_ = true;
has_wait_ = false;
expect_read_size_ = read_size;
last_read_offset_ = read_offset;
io_flag_ = io_flag;
}
return ret;
}
@ -81,6 +94,8 @@ int ObTmpFileIOHandle::prepare_write(char* write_buf, const int64_t write_size,
tmp_file_ = file;
is_read_ = false;
has_wait_ = false;
expect_read_size_ = 0;
last_read_offset_ = -1;
}
return ret;
}
@ -91,33 +106,68 @@ int ObTmpFileIOHandle::wait(const int64_t timeout_ms)
if (OB_UNLIKELY(has_wait_ && is_read_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "read wait() isn't reentrant interface, shouldn't call again", K(ret));
} else {
for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) {
ObBlockCacheHandle &tmp = block_cache_handles_.at(i);
MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
tmp.block_handle_.reset();
}
block_cache_handles_.reset();
for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) {
ObPageCacheHandle &tmp = page_cache_handles_.at(i);
MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
tmp.page_handle_.reset();
}
page_cache_handles_.reset();
for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) {
ObIOReadHandle &tmp = io_handles_.at(i);
if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret));
} else {
MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_);
tmp.macro_handle_.reset();
} else if (OB_FAIL(do_wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait tmp file io", K(ret), K(timeout_ms));
} else if (is_read_ && !has_wait_) {
if (size_ == expect_read_size_) {
// do nothing
} else if (OB_UNLIKELY(size_ > expect_read_size_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "read size more than expected size", K(ret), K(timeout_ms));
} else {
ObTmpFileIOInfo io_info;
io_info.fd_ = tmp_file_->get_fd();
io_info.dir_id_ = tmp_file_->get_dir_id();
io_info.tenant_id_ = tmp_file_->get_tenant_id();
io_info.size_ = expect_read_size_;
io_info.buf_ = buf_;
io_info.io_desc_ = io_flag_;
while (OB_SUCC(ret) && size_ < expect_read_size_) {
if (OB_FAIL(tmp_file_->once_aio_read_batch(io_info, update_offset_in_file_, last_read_offset_, *this))) {
STORAGE_LOG(WARN, "fail to read once batch", K(ret), K(timeout_ms), K(io_info), K(*this));
} else if (OB_FAIL(do_wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait tmp file io", K(ret), K(timeout_ms));
}
}
}
io_handles_.reset();
has_wait_ = true;
}
if (OB_SUCC(ret) || OB_ITER_END == ret) {
has_wait_ = true;
expect_read_size_ = 0;
last_read_offset_ = -1;
update_offset_in_file_ = false;
}
return ret;
}
int ObTmpFileIOHandle::do_wait(const int64_t timeout_ms)
{
int ret = OB_SUCCESS;
for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) {
ObBlockCacheHandle &tmp = block_cache_handles_.at(i);
MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
tmp.block_handle_.reset();
}
block_cache_handles_.reset();
for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) {
ObPageCacheHandle &tmp = page_cache_handles_.at(i);
MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
tmp.page_handle_.reset();
}
page_cache_handles_.reset();
for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) {
ObIOReadHandle &tmp = io_handles_.at(i);
if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret));
} else {
MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_);
tmp.macro_handle_.reset();
}
}
io_handles_.reset();
return ret;
}
@ -140,6 +190,9 @@ void ObTmpFileIOHandle::reset()
tmp_file_ = NULL;
is_read_ = false;
has_wait_ = false;
expect_read_size_ = 0;
last_read_offset_ = -1;
update_offset_in_file_ = false;
}
bool ObTmpFileIOHandle::is_valid()
@ -556,51 +609,98 @@ int64_t ObTmpFile::find_first_extent(const int64_t offset)
return first_extent;
}
int ObTmpFile::aio_pread_without_lock(const ObTmpFileIOInfo &io_info,
int64_t &offset, ObTmpFileIOHandle &handle)
int ObTmpFile::aio_read_without_lock(const ObTmpFileIOInfo &io_info, int64_t &offset, ObTmpFileIOHandle &handle)
{
int ret = OB_SUCCESS;
char* buf = io_info.buf_;
int64_t size = io_info.size_;
int64_t read_size = 0;
ObTmpFileExtent* tmp = file_meta_.get_last_extent();
if (NULL == tmp) {
ObTmpFileExtent *tmp = nullptr;
if (OB_ISNULL(tmp = file_meta_.get_last_extent())) {
ret = OB_BAD_NULL_ERROR;
STORAGE_LOG(WARN, "fail to read, because this temporary file is empty", K(ret), K(io_info));
} else if (size > 0 && offset >= tmp->get_global_end()) {
STORAGE_LOG(WARN, "fail to read, because the tmp file is empty", K(ret), KP(tmp), K(io_info));
} else if (OB_UNLIKELY(io_info.size_ > 0 && offset >= tmp->get_global_end())) {
ret = OB_ITER_END;
} else if (OB_FAIL(handle.prepare_read(io_info.buf_, this))) {
STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret));
} else if (OB_FAIL(handle.prepare_read(io_info.size_, offset, io_info.io_desc_, io_info.buf_, this))) {
STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret), K(io_info), K(offset));
} else if (once_aio_read_batch_without_lock(io_info, offset, handle)) {
STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle));
} else {
int64_t ith_extent = 0;
common::ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
if (offset >= last_extent_min_offset_ && offset_ <= last_extent_max_offset_) {
ith_extent = last_extent_id_;
} else {
ith_extent = find_first_extent(offset);
}
for (; OB_SUCC(ret) && ith_extent < extents.count() && size > 0; ++ith_extent) {
tmp = extents.at(ith_extent);
if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) {
if (offset + size > tmp->get_global_end()) {
read_size = tmp->get_global_end() - offset;
} else {
read_size = size;
}
// read from the extent.
if (OB_FAIL(tmp->read(io_info, offset - tmp->get_global_start(), read_size, buf, handle))) {
STORAGE_LOG(WARN, "fail to read the extent", K(ret), K(io_info), K(buf), KP_(io_info.buf));
} else {
offset += read_size;
size -= read_size;
buf += read_size;
handle.add_data_size(read_size);
last_extent_id_ = ith_extent;
last_extent_min_offset_ = tmp->get_global_start();
last_extent_max_offset_ = tmp->get_global_end();
}
handle.set_last_read_offset(offset);
}
return ret;
}
int ObTmpFile::once_aio_read_batch(
const ObTmpFileIOInfo &io_info, const bool need_update_offset, int64_t &offset, ObTmpFileIOHandle &handle)
{
int ret = OB_SUCCESS;
ObTmpFileExtent *tmp = nullptr;
const int64_t remain_size = io_info.size_ - handle.get_data_size();
SpinWLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpFile has not been initialized", K(ret));
} else if (OB_UNLIKELY(offset < 0 || remain_size < 0) || OB_ISNULL(io_info.buf_)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(remain_size), KP(io_info.buf_));
} else if (OB_ISNULL(tmp = file_meta_.get_last_extent())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected error, null tmp file extent", K(ret), KP(tmp), K(io_info));
} else if (OB_UNLIKELY(remain_size > 0 && offset >= tmp->get_global_end())) {
ret = OB_ITER_END;
} else if (once_aio_read_batch_without_lock(io_info, offset, handle)) {
STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle));
} else {
handle.set_last_read_offset(offset);
}
if (need_update_offset) {
offset_ = offset;
}
return ret;
}
int ObTmpFile::once_aio_read_batch_without_lock(
const ObTmpFileIOInfo &io_info, int64_t &offset, ObTmpFileIOHandle &handle)
{
int ret = OB_SUCCESS;
int64_t one_batch_read_size = 0;
char *buf = io_info.buf_ + handle.get_data_size();
int64_t remain_size = io_info.size_ - handle.get_data_size();
int64_t read_size = 0;
int64_t ith_extent = 0;
ObTmpFileExtent *tmp = nullptr;
common::ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
if (offset >= last_extent_min_offset_ && offset_ <= last_extent_max_offset_) {
ith_extent = last_extent_id_;
} else {
ith_extent = find_first_extent(offset);
}
while (OB_SUCC(ret) && ith_extent < extents.count() && remain_size > 0 && one_batch_read_size < READ_SIZE_PER_BATCH) {
tmp = extents.at(ith_extent);
if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) {
if (offset + remain_size > tmp->get_global_end()) {
read_size = tmp->get_global_end() - offset;
} else {
read_size = remain_size;
}
// read from the extent.
if (OB_FAIL(tmp->read(io_info, offset - tmp->get_global_start(), read_size, buf, handle))) {
STORAGE_LOG(WARN, "fail to read the extent", K(ret), K(io_info), K(buf), KP_(io_info.buf));
} else {
offset += read_size;
remain_size -= read_size;
buf += read_size;
one_batch_read_size += read_size;
handle.add_data_size(read_size);
last_extent_id_ = ith_extent;
last_extent_min_offset_ = tmp->get_global_start();
last_extent_max_offset_ = tmp->get_global_end();
}
}
++ith_extent;
}
return ret;
}
@ -614,10 +714,12 @@ int ObTmpFile::aio_read(const ObTmpFileIOInfo& io_info, ObTmpFileIOHandle& handl
} else {
tenant_id_ = io_info.tenant_id_;
SpinWLockGuard guard(lock_);
if (OB_FAIL(aio_pread_without_lock(io_info, offset_, handle))) {
if (OB_FAIL(aio_read_without_lock(io_info, offset_, handle))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "fail to do aio pread without lock", K(ret));
STORAGE_LOG(WARN, "fail to do aio read without lock", K(ret));
}
} else {
handle.set_update_offset_in_file();
}
}
return ret;
@ -631,9 +733,9 @@ int ObTmpFile::aio_pread(const ObTmpFileIOInfo& io_info, const int64_t offset, O
} else {
int64_t tmp_offset = offset;
SpinRLockGuard guard(lock_);
if (OB_FAIL(aio_pread_without_lock(io_info, tmp_offset, handle))) {
if (OB_FAIL(aio_read_without_lock(io_info, tmp_offset, handle))) {
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "fail to do aio pread without lock", K(ret));
STORAGE_LOG(WARN, "fail to do aio read without lock", K(ret));
}
}
}
@ -677,7 +779,9 @@ int ObTmpFile::read(const ObTmpFileIOInfo& io_info, const int64_t timeout_ms, Ob
}
}
} else if (OB_FAIL(handle.wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms));
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms));
}
}
return ret;
}
@ -700,7 +804,9 @@ int ObTmpFile::pread(
}
}
} else if (OB_FAIL(handle.wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms));
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms));
}
}
return ret;
}
@ -811,7 +917,7 @@ uint64_t ObTmpFile::get_tenant_id() const
return tenant_id_;
}
int64_t ObTmpFile::get_fd()
int64_t ObTmpFile::get_fd() const
{
return file_meta_.get_fd();
}