Optimize tmp file

This commit is contained in:
godchen0212
2023-07-14 04:42:06 +00:00
committed by ob-robot
parent 6734061113
commit 9b23516657
12 changed files with 1682 additions and 883 deletions

View File

@ -48,20 +48,23 @@ bool ObTmpFileIOInfo::is_valid() const
}
ObTmpFileIOHandle::ObTmpFileIOHandle()
: is_read_(false),
has_wait_(false),
update_offset_in_file_(false),
fd_(OB_INVALID_FD),
dir_id_(OB_INVALID_ID),
size_(0),
expect_read_size_(0),
last_read_offset_(-1),
tenant_id_(OB_INVALID_TENANT_ID),
buf_(NULL),
io_flag_(),
io_handles_(),
: io_handles_(),
page_cache_handles_(),
block_cache_handles_(),
write_block_ids_(),
fd_(OB_INVALID_FD),
dir_id_(OB_INVALID_ID),
tenant_id_(OB_INVALID_TENANT_ID),
buf_(NULL),
size_(0),
is_read_(false),
has_wait_(false),
is_finished_(false),
ret_code_(OB_SUCCESS),
expect_read_size_(0),
last_read_offset_(-1),
io_flag_(),
update_offset_in_file_(false),
last_fd_(OB_INVALID_FD),
last_extent_id_(0)
{
@ -77,18 +80,20 @@ int ObTmpFileIOHandle::prepare_read(
const int64_t read_offset,
const common::ObIOFlag io_flag,
char *read_buf,
ObTmpFile *file)
int64_t fd,
int64_t dir_id,
uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(read_buf) ||OB_ISNULL(file)) {
if (OB_ISNULL(read_buf)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KP_(buf));
} else {
buf_ = read_buf;
size_ = 0;
fd_ = file->get_fd();
dir_id_ = file->get_dir_id();
tenant_id_ = file->get_tenant_id();
fd_ = fd;
dir_id_ = dir_id;
tenant_id_ = tenant_id;
is_read_ = true;
has_wait_ = false;
expect_read_size_ = read_size;
@ -102,18 +107,25 @@ int ObTmpFileIOHandle::prepare_read(
return ret;
}
int ObTmpFileIOHandle::prepare_write(char *write_buf, const int64_t write_size, ObTmpFile *file)
int ObTmpFileIOHandle::prepare_write(
char *write_buf,
const int64_t write_size,
int64_t fd,
int64_t dir_id,
uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(write_buf) || OB_ISNULL(file)) {
if (OB_ISNULL(write_buf)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), KP_(buf));
} else if (OB_FAIL(write_block_ids_.create(17))) {
STORAGE_LOG(WARN, "create write block id set failed", K(ret), KP_(buf));
} else {
buf_ = write_buf;
size_ = write_size;
fd_ = file->get_fd();
dir_id_ = file->get_dir_id();
tenant_id_ = file->get_tenant_id();
fd_ = fd;
dir_id_ = dir_id;
tenant_id_ = tenant_id;
is_read_ = false;
has_wait_ = false;
expect_read_size_ = 0;
@ -124,13 +136,67 @@ int ObTmpFileIOHandle::prepare_write(char *write_buf, const int64_t write_size,
}
int ObTmpFileIOHandle::wait(const int64_t timeout_ms)
{
int ret = OB_SUCCESS;
if (timeout_ms < 0) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument. timeout must be positive", K(ret), K(timeout_ms));
} else if (!is_finished_) {
if (is_read_ && OB_FAIL(wait_read_finish(timeout_ms))) {
STORAGE_LOG(WARN, "wait read finish failed", K(ret), K(timeout_ms), K(is_read_));
} else if (!is_read_ && OB_FAIL(wait_write_finish(timeout_ms))) {
STORAGE_LOG(WARN, "wait write finish failed", K(ret), K(timeout_ms), K(is_read_));
}
ret_code_ = ret;
is_finished_ = true;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ret_code_)) {
STORAGE_LOG(WARN, "tmp file io error", K(ret), KPC(this));
}
}
return ret;
}
int ObTmpFileIOHandle::wait_write_finish(const int64_t timeout_ms)
{
int ret = OB_SUCCESS;
if (write_block_ids_.size() == 0) {
STORAGE_LOG(DEBUG, "write block ids size is 0", K(ret), K(timeout_ms));
} else {
// iter all blocks, execute wait
common::hash::ObHashSet<int64_t>::const_iterator iter;
int64_t begin_us = ObTimeUtility::fast_current_time();
int64_t wait_ms = timeout_ms;
for (iter = write_block_ids_.begin(); OB_SUCC(ret) && iter != write_block_ids_.end(); ++iter) {
const int64_t &blk_id = iter->first;
if (OB_FAIL(OB_TMP_FILE_STORE.wait_write_finish(tenant_id_, blk_id, wait_ms))) {
STORAGE_LOG(WARN, "fail to wait write finish", K(ret), K(blk_id), K(timeout_ms));
}
wait_ms = timeout_ms - (ObTimeUtility::fast_current_time() - begin_us) / 1000;
if (OB_SUCC(ret) && OB_UNLIKELY(wait_ms <= 0)) {
ret = OB_TIMEOUT;
STORAGE_LOG(WARN, "fail to wait tmp file write finish", K(ret), K(wait_ms), K(blk_id), K(timeout_ms));
}
}
int bret = OB_SUCCESS;
if (OB_UNLIKELY(OB_SUCCESS != (bret = write_block_ids_.destroy()))) {
STORAGE_LOG(WARN, "fail to destroy write block id set", K(bret), K(wait_ms), K(timeout_ms));
}
}
return ret;
}
int ObTmpFileIOHandle::wait_read_finish(const int64_t timeout_ms)
{
int ret = OB_SUCCESS;
ObTmpFileHandle file_handle;
if (OB_UNLIKELY(has_wait_ && is_read_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "read wait() isn't reentrant interface, shouldn't call again", K(ret));
} else if (OB_FAIL(do_wait(timeout_ms))) {
} else if (OB_FAIL(do_read_wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait tmp file io", K(ret), K(timeout_ms));
} else if (is_read_ && !has_wait_) {
if (size_ == expect_read_size_) {
@ -154,7 +220,7 @@ int ObTmpFileIOHandle::wait(const int64_t timeout_ms)
last_read_offset_,
*this))) {
STORAGE_LOG(WARN, "fail to read once batch", K(ret), K(timeout_ms), K(io_info), K(*this));
} else if (OB_FAIL(do_wait(timeout_ms))) {
} else if (OB_FAIL(do_read_wait(timeout_ms))) {
STORAGE_LOG(WARN, "fail to wait tmp file io", K(ret), K(timeout_ms));
}
}
@ -171,7 +237,7 @@ int ObTmpFileIOHandle::wait(const int64_t timeout_ms)
return ret;
}
int ObTmpFileIOHandle::do_wait(const int64_t timeout_ms)
int ObTmpFileIOHandle::do_read_wait(const int64_t timeout_ms)
{
int ret = OB_SUCCESS;
for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) {
@ -227,6 +293,7 @@ void ObTmpFileIOHandle::reset()
io_handles_.reset();
page_cache_handles_.reset();
block_cache_handles_.reset();
write_block_ids_.destroy();
fd_ = OB_INVALID_FD;
dir_id_ = OB_INVALID_ID;
tenant_id_ = OB_INVALID_TENANT_ID;
@ -238,14 +305,25 @@ void ObTmpFileIOHandle::reset()
last_read_offset_ = -1;
io_flag_.reset();
update_offset_in_file_ = false;
is_finished_ = false;
ret_code_ = OB_SUCCESS;
}
bool ObTmpFileIOHandle::is_valid()
bool ObTmpFileIOHandle::is_valid() const
{
return OB_INVALID_FD != fd_ && OB_INVALID_ID != dir_id_ && OB_INVALID_TENANT_ID != tenant_id_
&& NULL != buf_ && size_ >= 0;
}
int ObTmpFileIOHandle::record_block_id(const int64_t block_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(write_block_ids_.set_refactored(block_id, 1))) {
STORAGE_LOG(WARN, "record block id failed", K(ret), K(block_id));
}
return ret;
}
void ObTmpFileIOHandle::set_last_extent_id(const int64_t last_extent_id)
{
last_extent_id_ = last_extent_id;
@ -394,10 +472,10 @@ int ObTmpFileExtent::read(const ObTmpFileIOInfo &io_info, const int64_t offset,
if (OB_UNLIKELY(!is_alloced_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ObTmpFileExtent has not been allocated", K(ret));
} else if (OB_UNLIKELY(offset < 0 || offset >= get_offset() || size <= 0
|| offset + size > get_offset()) || OB_ISNULL(buf)) {
} else if (OB_UNLIKELY(offset < 0 || offset >= offset_ || size <= 0
|| offset + size > offset_) || OB_ISNULL(buf)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(get_offset()), K(size), K(buf));
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(offset_), K(size), K(buf));
} else {
ObTmpBlockIOInfo info;
info.buf_ = buf;
@ -421,42 +499,44 @@ int ObTmpFileExtent::write(const ObTmpFileIOInfo &io_info,int64_t &size, char *&
int write_size = 0;
int64_t remain = 0;
bool is_write = false;
bool need_close = false;
if (OB_UNLIKELY(size <= 0) || OB_ISNULL(buf)) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret));
} else if (OB_UNLIKELY(!is_alloced_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "ObTmpFileExtent has not been allocated", K(ret));
} else if (get_offset() == page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
close();
} else if (offset_ == page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
need_close = true;
} else {
SpinWLockGuard guard(lock_);
if (!is_closed()) {
remain = page_nums_ * ObTmpMacroBlock::get_default_page_size() - get_offset();
remain = page_nums_ * ObTmpMacroBlock::get_default_page_size() - offset_;
write_size = std::min(remain, size);
ObTmpBlockIOInfo info;
info.block_id_ = block_id_;
info.buf_ = buf;
info.io_desc_ = io_info.io_desc_;
info.offset_ = start_page_id_ * ObTmpMacroBlock::get_default_page_size() + get_offset();
info.offset_ = start_page_id_ * ObTmpMacroBlock::get_default_page_size() + offset_;
info.size_ = write_size;
info.tenant_id_ = io_info.tenant_id_;
if (OB_FAIL(OB_TMP_FILE_STORE.write(owner_->get_tenant_id(), info))) {
STORAGE_LOG(WARN, "fail to write the extent", K(ret));
} else {
ATOMIC_FAA(&offset_, write_size);
g_offset_end_ = get_offset() + g_offset_start_;
offset_ += write_size;
g_offset_end_ = offset_ + g_offset_start_;
buf += write_size;
size -= write_size;
is_write = true;
if (remain == write_size) {
need_close = true;
}
STORAGE_LOG(DEBUG, "debug tmp file: write extent", K(ret), K(info), K(*this));
}
}
}
if (is_write) {
if (remain == write_size) {
close();
}
if (need_close) {
close();
try_sync_block();
}
return ret;
}
@ -467,12 +547,12 @@ void ObTmpFileExtent::reset()
fd_ = -1;
g_offset_start_ = 0;
g_offset_end_ = 0;
ATOMIC_SET(&offset_, 0);
offset_ = 0;
owner_ = NULL;
start_page_id_ = -1;
page_nums_ = 0;
block_id_ = -1;
is_closed_= false;
ATOMIC_STORE(&is_closed_, false);
}
bool ObTmpFileExtent::is_valid()
@ -503,7 +583,7 @@ bool ObTmpFileExtent::close(bool force)
}
}
}
return is_closed_;
return is_closed();
}
bool ObTmpFileExtent::close(uint8_t &free_page_start_id, uint8_t &free_page_nums, bool force)
@ -511,30 +591,50 @@ bool ObTmpFileExtent::close(uint8_t &free_page_start_id, uint8_t &free_page_nums
free_page_start_id = ObTmpFilePageBuddy::MAX_PAGE_NUMS;
free_page_nums = 0;
SpinWLockGuard guard(lock_);
if (!is_closed_) {
if (!force && 0 != page_nums_ && 0 == get_offset()) {
if (!is_closed()) {
if (!force && 0 != page_nums_ && 0 == offset_) {
// Nothing to do. This extent is alloced just now, so it cannot be closed.
} else {
if (get_offset() != page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
uint8_t offset_page_id = common::upper_align(get_offset(), ObTmpMacroBlock::get_default_page_size())
if (offset_ != page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
uint8_t offset_page_id = common::upper_align(offset_, ObTmpMacroBlock::get_default_page_size())
/ ObTmpMacroBlock::get_default_page_size();
free_page_nums = page_nums_ - offset_page_id;
free_page_start_id = start_page_id_ + offset_page_id;
page_nums_ -= free_page_nums;
}
is_closed_ = true;
ATOMIC_STORE(&is_closed_, true);
}
}
return is_closed_;
return is_closed();
}
int ObTmpFileExtent::try_sync_block()
{
int ret = OB_SUCCESS;
ObTmpTenantMemBlockManager::ObIOWaitInfoHandle handle;
ObTmpMacroBlock *blk = NULL;
if (OB_FAIL(OB_TMP_FILE_STORE.get_macro_block(owner_->get_tenant_id(), block_id_, blk))) {
STORAGE_LOG(WARN, "fail to get macro block", K(ret), K(owner_->get_tenant_id()),K(block_id_));
} else if (OB_ISNULL(blk)) {
ret = OB_ERR_NULL_VALUE;
} else if (0 != blk->get_free_page_nums()) {
STORAGE_LOG(DEBUG, "ob tmp macro block has not been used up", K(ret), K(blk->get_free_page_nums()), K(owner_->get_tenant_id()),K(block_id_));
} else if (OB_FAIL(OB_TMP_FILE_STORE.wash_block(owner_->get_tenant_id(), block_id_, handle))) {
// try to flush the block to the disk. If fails, do nothing.
STORAGE_LOG(DEBUG, "fail to sync block", K(ret), K(owner_->get_tenant_id()), K(block_id_));
}
return ret;
}
void ObTmpFileExtent::unclose(const int32_t page_nums)
{
SpinWLockGuard guard(lock_);
if (page_nums >= 0) {
page_nums_ += page_nums;
}
is_closed_ = false;
ATOMIC_STORE(&is_closed_, false);
}
ObTmpFileMeta::~ObTmpFileMeta()
@ -686,18 +786,19 @@ int ObTmpFile::aio_read_without_lock(const ObTmpFileIOInfo &io_info,
{
int ret = OB_SUCCESS;
ObTmpFileExtent *tmp = nullptr;
if (OB_ISNULL(tmp = file_meta_.get_last_extent())) {
ret = OB_BAD_NULL_ERROR;
STORAGE_LOG(WARN, "fail to read, because the tmp file is empty", K(ret), KP(tmp), K(io_info));
} else if (OB_UNLIKELY(io_info.size_ > 0 && offset >= tmp->get_global_end())) {
ret = OB_ITER_END;
} else if (OB_FAIL(handle.prepare_read(io_info.size_,
offset,
io_info.io_desc_,
io_info.buf_,
this))){
file_meta_.get_fd(),
file_meta_.get_dir_id(),
io_info.tenant_id_))){
STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret), K(io_info), K(offset));
} else if (OB_UNLIKELY(io_info.size_ > 0 && offset >= tmp->get_global_end())) {
ret = OB_ITER_END;
} else if (OB_FAIL(once_aio_read_batch_without_lock(io_info, offset, handle))) {
STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle));
} else {
@ -926,7 +1027,11 @@ int ObTmpFile::aio_write(const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &hand
} else if (!io_info.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(io_info), K(handle));
} else if (OB_FAIL(handle.prepare_write(io_info.buf_, io_info.size_, this))) {
} else if (OB_FAIL(handle.prepare_write(io_info.buf_,
io_info.size_,
file_meta_.get_fd(),
file_meta_.get_dir_id(),
io_info.tenant_id_))) {
STORAGE_LOG(WARN, "fail to prepare write io handle", K(ret));
} else {
tenant_id_ = io_info.tenant_id_;
@ -960,8 +1065,8 @@ int ObTmpFile::aio_write(const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &hand
} else {
alloc_size = common::upper_align(size, big_file_prealloc_size());
}
} else if (size > ObTmpMacroBlock::get_block_size()) {
alloc_size = ObTmpMacroBlock::get_block_size();
} else if (size > ObTmpFileStore::get_block_size()) {
alloc_size = ObTmpFileStore::get_block_size();
} else {
alloc_size = size;
}
@ -1003,6 +1108,18 @@ int ObTmpFile::aio_write(const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &hand
ret = OB_INVALID_ERROR;
STORAGE_LOG(WARN, "invalid extent", K(ret));
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(tmp)) {
ret = OB_ERR_NULL_VALUE;
STORAGE_LOG(ERROR, "invalid tmp", K(ret));
} else if (OB_FAIL(handle.record_block_id(tmp->get_block_id()))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
} else {
STORAGE_LOG(WARN, "set block id failed", K(ret));
}
}
}
}
handle.sub_data_size(io_info.size_ - size);
if (OB_SUCC(ret) && !is_big_){
@ -1040,7 +1157,6 @@ int ObTmpFile::sync(const int64_t timeout_ms)
ret = OB_BAD_NULL_ERROR;
STORAGE_LOG(WARN, "the file does not have a extent", K(ret), K(timeout_ms));
} else {
// TODO : add timeout implementation
tmp->close(true/*force*/);
// all extents has been closed.
const ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
@ -1052,17 +1168,44 @@ int ObTmpFile::sync(const int64_t timeout_ms)
for (int64_t i=0; OB_SUCC(ret) && i < extents.count(); ++i) {
const ObTmpFileExtent* e = extents.at(i);
const int64_t &blk_id = e->get_block_id();
if (OB_FAIL(blk_id_set. set_refactored(blk_id))) {
if (OB_FAIL(blk_id_set.set_refactored(blk_id))) {
STORAGE_LOG(WARN, "add block id to set failed", K(ret), K(blk_id));
}
}
// iter all blocks, execute sync
// iter all blocks, execute async wash.
common::hash::ObHashSet<int64_t>::const_iterator iter;
common::ObSEArray<ObTmpTenantMemBlockManager::ObIOWaitInfoHandle, 1> handles;
for (iter = blk_id_set.begin(); OB_SUCC(ret) && iter != blk_id_set.end(); ++iter) {
const int64_t &blk_id = iter->first;
if (OB_FAIL(OB_TMP_FILE_STORE.sync(tenant_id_, blk_id))) {
STORAGE_LOG(WARN, "sync block failed", K(ret), K(blk_id));
ObTmpTenantMemBlockManager::ObIOWaitInfoHandle handle;
if (OB_FAIL(OB_TMP_FILE_STORE.sync_block(tenant_id_, blk_id, handle))) {
// OB_HASH_NOT_EXIST:
// if multiple file sync same block, the block may be not exist in hash map.
// OB_STATE_NOT_MATCH:
// the extents in block may be not all close and shouldn't sync it now.
if (OB_HASH_NOT_EXIST == ret || OB_STATE_NOT_MATCH == ret) {
ret = OB_SUCCESS;
} else {
STORAGE_LOG(WARN, "sync block failed", K(ret), K(blk_id));
}
} else if (OB_NOT_NULL(handle.get_wait_info()) && OB_FAIL(handles.push_back(handle))) {
STORAGE_LOG(WARN, "push back wait handle to array failed", K(ret), K(blk_id));
}
}
int64_t begin_us = ObTimeUtility::fast_current_time();
int64_t wait_ms = timeout_ms;
for (int64_t i=0; OB_SUCC(ret) && i < handles.count(); ++i) {
const ObTmpTenantMemBlockManager::ObIOWaitInfoHandle handle = handles.at(i);
if (OB_FAIL(handle.get_wait_info()->wait(timeout_ms))) {
STORAGE_LOG(WARN, "add block id to set failed", K(ret), K(timeout_ms));
} else {
wait_ms = timeout_ms - (ObTimeUtility::fast_current_time() - begin_us) / 1000;
}
if (OB_SUCC(ret) && OB_UNLIKELY(wait_ms <= 0)) { // rarely happen
ret = OB_TIMEOUT;
STORAGE_LOG(WARN, "fail to wait tmp file sync finish", K(ret), K(wait_ms));
}
}
}
@ -1093,11 +1236,6 @@ int ObTmpFile::deep_copy(char *buf, const int64_t buf_len, ObTmpFile *&value) co
return ret;
}
int64_t ObTmpFile::get_deep_copy_size() const
{
return sizeof(*this);
}
void ObTmpFile::get_file_size(int64_t &file_size)
{
ObTmpFileExtent *tmp = file_meta_.get_last_extent();
@ -1420,17 +1558,13 @@ int ObTmpFileManager::remove(const int64_t fd)
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret));
} else{
common::SpinWLockGuard guard(rm_file_lock_);
ObTmpFileHandle file_handle;
if (OB_FAIL(files_.get(fd, file_handle))) {
if (OB_FAIL(files_.erase(fd))) {
if (common::OB_ENTRY_NOT_EXIST != ret) {
STORAGE_LOG(WARN, "fail to get tmp file handle", K(ret), K(fd));
} else {
ret = OB_SUCCESS;
STORAGE_LOG(INFO, "this tmp file has been removed", K(fd), K(common::lbt()));
}
} else if (OB_FAIL(files_.erase(fd))) {
STORAGE_LOG(WARN, "fail to erase from map", K(ret));
} else {
ObTaskController::get().allow_next_syslog();
STORAGE_LOG(INFO, "succeed to remove a tmp file", K(fd), K(common::lbt()));