add defense code for tmp file in case of misuse.
This commit is contained in:
@ -40,14 +40,10 @@ bool ObTmpFileIOInfo::is_valid() const
|
||||
}
|
||||
|
||||
ObTmpFileIOHandle::ObTmpFileIOHandle()
|
||||
: tmp_file_(NULL),
|
||||
io_handles_(),
|
||||
page_cache_handles_(),
|
||||
block_cache_handles_(),
|
||||
buf_(NULL),
|
||||
size_(0),
|
||||
is_read_(false)
|
||||
{}
|
||||
: tmp_file_(NULL), io_handles_(), page_cache_handles_(), block_cache_handles_(), buf_(NULL),
|
||||
size_(0), is_read_(false), has_wait_(false)
|
||||
{
|
||||
}
|
||||
|
||||
ObTmpFileIOHandle::~ObTmpFileIOHandle()
|
||||
{
|
||||
@ -65,6 +61,7 @@ int ObTmpFileIOHandle::prepare_read(char* read_buf, ObTmpFile* file)
|
||||
size_ = 0;
|
||||
tmp_file_ = file;
|
||||
is_read_ = true;
|
||||
has_wait_ = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -80,6 +77,7 @@ int ObTmpFileIOHandle::prepare_write(char* write_buf, const int64_t write_size,
|
||||
size_ = write_size;
|
||||
tmp_file_ = file;
|
||||
is_read_ = false;
|
||||
has_wait_ = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -87,29 +85,35 @@ int ObTmpFileIOHandle::prepare_write(char* write_buf, const int64_t write_size,
|
||||
int ObTmpFileIOHandle::wait(const int64_t timeout_ms)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) {
|
||||
ObBlockCacheHandle& tmp = block_cache_handles_.at(i);
|
||||
MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
|
||||
tmp.block_handle_.reset();
|
||||
}
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) {
|
||||
ObPageCacheHandle& tmp = page_cache_handles_.at(i);
|
||||
MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
|
||||
tmp.page_handle_.reset();
|
||||
}
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) {
|
||||
ObIOReadHandle& tmp = io_handles_.at(i);
|
||||
if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) {
|
||||
STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret));
|
||||
} else {
|
||||
MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_);
|
||||
tmp.macro_handle_.reset();
|
||||
if (OB_UNLIKELY(has_wait_ && is_read_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "read wait() isn't reentrant interface, shouldn't call again", K(ret));
|
||||
} else {
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) {
|
||||
ObBlockCacheHandle &tmp = block_cache_handles_.at(i);
|
||||
MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
|
||||
tmp.block_handle_.reset();
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
io_handles_.reset();
|
||||
page_cache_handles_.reset();
|
||||
block_cache_handles_.reset();
|
||||
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) {
|
||||
ObPageCacheHandle &tmp = page_cache_handles_.at(i);
|
||||
MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_);
|
||||
tmp.page_handle_.reset();
|
||||
}
|
||||
page_cache_handles_.reset();
|
||||
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) {
|
||||
ObIOReadHandle &tmp = io_handles_.at(i);
|
||||
if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) {
|
||||
STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret));
|
||||
} else {
|
||||
MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_);
|
||||
tmp.macro_handle_.reset();
|
||||
}
|
||||
}
|
||||
io_handles_.reset();
|
||||
has_wait_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -132,6 +136,7 @@ void ObTmpFileIOHandle::reset()
|
||||
size_ = 0;
|
||||
tmp_file_ = NULL;
|
||||
is_read_ = false;
|
||||
has_wait_ = false;
|
||||
}
|
||||
|
||||
bool ObTmpFileIOHandle::is_valid()
|
||||
@ -1000,6 +1005,7 @@ int ObTmpFileManager::aio_read(const ObTmpFileIOInfo& io_info, ObTmpFileIOHandle
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTmpFileHandle file_handle;
|
||||
handle.reset();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret));
|
||||
@ -1020,6 +1026,7 @@ int ObTmpFileManager::aio_pread(const ObTmpFileIOInfo& io_info, const int64_t of
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTmpFileHandle file_handle;
|
||||
handle.reset();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret));
|
||||
@ -1040,6 +1047,7 @@ int ObTmpFileManager::read(const ObTmpFileIOInfo& io_info, const int64_t timeout
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTmpFileHandle file_handle;
|
||||
handle.reset();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret));
|
||||
@ -1061,6 +1069,7 @@ int ObTmpFileManager::pread(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTmpFileHandle file_handle;
|
||||
handle.reset();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret));
|
||||
@ -1081,6 +1090,7 @@ int ObTmpFileManager::aio_write(const ObTmpFileIOInfo& io_info, ObTmpFileIOHandl
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTmpFileHandle file_handle;
|
||||
handle.reset();
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret));
|
||||
|
@ -126,6 +126,7 @@ private:
|
||||
char* buf_;
|
||||
int64_t size_; // has read or to write size.
|
||||
bool is_read_;
|
||||
bool has_wait_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTmpFileIOHandle);
|
||||
};
|
||||
|
||||
|
@ -1423,6 +1423,59 @@ TEST_F(TestTmpFile, test_drop_tenant_file)
|
||||
ASSERT_EQ(0, ObTmpFileStore::get_instance().tenant_file_stores_.size());
|
||||
}
|
||||
|
||||
TEST_F(TestTmpFile, test_handle_double_wait)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t dir = -1;
|
||||
int64_t fd = -1;
|
||||
ObTmpFileIOInfo io_info;
|
||||
ObTmpFileIOHandle handle;
|
||||
ret = ObTmpFileManager::get_instance().alloc_dir(dir);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = ObTmpFileManager::get_instance().open(fd, dir);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
char *write_buf = new char [256];
|
||||
for (int i = 0; i < 256; ++i) {
|
||||
write_buf[i] = static_cast<char>(i);
|
||||
}
|
||||
char *read_buf = new char [256];
|
||||
io_info.fd_ = fd;
|
||||
io_info.tenant_id_ = 1;
|
||||
io_info.io_desc_.category_ = USER_IO;
|
||||
io_info.io_desc_.wait_event_no_ = 2;
|
||||
io_info.buf_ = write_buf;
|
||||
io_info.size_ = 256;
|
||||
const int64_t timeout_ms = 5000;
|
||||
int64_t write_time = ObTimeUtility::current_time();
|
||||
ret = ObTmpFileManager::get_instance().write(io_info, timeout_ms);
|
||||
write_time = ObTimeUtility::current_time() - write_time;
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
io_info.buf_ = read_buf;
|
||||
|
||||
|
||||
int64_t read_time = ObTimeUtility::current_time();
|
||||
ret = ObTmpFileManager::get_instance().pread(io_info, 0, timeout_ms, handle);
|
||||
read_time = ObTimeUtility::current_time() - read_time;
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(256, handle.get_data_size());
|
||||
int cmp = memcmp(handle.get_buffer(), write_buf, 256);
|
||||
ASSERT_EQ(0, cmp);
|
||||
|
||||
ASSERT_EQ(OB_ERR_UNEXPECTED, handle.wait(timeout_ms));
|
||||
|
||||
STORAGE_LOG(INFO, "test_handle_double_wait");
|
||||
STORAGE_LOG(INFO, "io time", K(write_time), K(read_time));
|
||||
ObTmpTenantFileStore *store = NULL;
|
||||
OB_TMP_FILE_STORE.get_store(1, store);
|
||||
store->print_block_usage();
|
||||
ObMallocAllocator::get_instance()->print_tenant_memory_usage(1);
|
||||
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(1);
|
||||
ObMallocAllocator::get_instance()->print_tenant_memory_usage(500);
|
||||
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500);
|
||||
|
||||
ObTmpFileManager::get_instance().remove(fd);
|
||||
}
|
||||
|
||||
} // end namespace unittest
|
||||
} // end namespace oceanbase
|
||||
|
||||
|
Reference in New Issue
Block a user