fix bug of using MTL for tenant 500
This commit is contained in:
parent
f91c51797c
commit
e1d98420f5
@ -63,7 +63,6 @@ public:
|
||||
static void TearDownTestCase();
|
||||
};
|
||||
static ObSimpleMemLimitGetter getter;
|
||||
static const int64_t TEST_ROWKEY_COLUMN_CNT = 2;
|
||||
|
||||
// ATTENTION!
|
||||
// currently, we only initialize modules about tmp file at the beginning of unit test and
|
||||
@ -268,7 +267,6 @@ TEST_F(TestTmpFile, test_read)
|
||||
ret = MTL(ObTenantTmpFileManager *)->get_sn_file_manager().get_tmp_file(fd, file_handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY;
|
||||
file_handle.reset();
|
||||
|
||||
ObTmpFileIOInfo io_info;
|
||||
io_info.fd_ = fd;
|
||||
@ -282,11 +280,8 @@ TEST_F(TestTmpFile, test_read)
|
||||
write_time = ObTimeUtility::current_time() - write_time;
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ret = MTL(ObTenantTmpFileManager *)->get_sn_file_manager().get_tmp_file(fd, file_handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t wbp_begin_offset = file_handle.get()->cal_wbp_begin_offset();
|
||||
ASSERT_GT(wbp_begin_offset, 0);
|
||||
file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY;
|
||||
file_handle.reset();
|
||||
|
||||
int64_t read_time = ObTimeUtility::current_time();
|
||||
@ -1204,7 +1199,6 @@ TEST_F(TestTmpFile, test_write_last_page_during_flush)
|
||||
LOG_INFO("test_write_last_page_during_flush");
|
||||
}
|
||||
|
||||
// generate 750MB random data.
|
||||
// this test will trigger flush and evict logic for both data and meta pages.
|
||||
void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpFileIOInfo io_info)
|
||||
{
|
||||
@ -1239,7 +1233,7 @@ void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpF
|
||||
io_info.io_desc_.set_wait_event(2);
|
||||
io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
||||
|
||||
// 1. write 750MB data
|
||||
// 1. write data
|
||||
io_info.buf_ = write_buf;
|
||||
io_info.size_ = write_size;
|
||||
int64_t write_time = ObTimeUtility::current_time();
|
||||
@ -1247,7 +1241,7 @@ void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpF
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
write_time = ObTimeUtility::current_time() - write_time;
|
||||
|
||||
// 2. read 750MB data
|
||||
// 2. read data
|
||||
ObTmpFileIOHandle handle;
|
||||
int64_t read_size = write_size;
|
||||
char *read_buf = new char [read_size];
|
||||
@ -1619,6 +1613,9 @@ TEST_F(TestTmpFile, test_multiple_small_files)
|
||||
STORAGE_LOG(INFO, "io time", K(io_time));
|
||||
}
|
||||
|
||||
// ATTENTION
|
||||
// the case after this will increase wbp_mem_limit to BIG_WBP_MEM_LIMIT.
|
||||
// And it will never be decreased as long as it has been increased
|
||||
TEST_F(TestTmpFile, test_big_file)
|
||||
{
|
||||
const int64_t write_size = 750 * 1024 * 1024; // write 750MB data
|
||||
@ -1649,7 +1646,6 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache)
|
||||
test_big_file(write_size, wbp_mem_limit, io_info);
|
||||
}
|
||||
|
||||
// TODO, xuwei, enbale later
|
||||
TEST_F(TestTmpFile, test_aio_pread)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1733,7 +1729,7 @@ TEST_F(TestTmpFile, test_aio_pread)
|
||||
ret = MTL(ObTenantTmpFileManager *)->remove(fd);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
LOG_INFO("test_cached_read");
|
||||
LOG_INFO("test_aio_pread");
|
||||
}
|
||||
} // namespace oceanbase
|
||||
|
||||
|
@ -81,7 +81,7 @@ int ObAllVirtualTmpFileInfo::get_next_tmp_file_info_(tmp_file::ObSNTmpFileInfo &
|
||||
if (fd_idx_ >= fd_arr_.count()) {
|
||||
ret = OB_ITER_END;
|
||||
SERVER_LOG(INFO, "iterate current tenant reach end", K(fd_idx_), K(fd_arr_.count()));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_info(fd_arr_.at(fd_idx_), tmp_file_info))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_info(MTL_ID(), fd_arr_.at(fd_idx_), tmp_file_info))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret || OB_TIMEOUT == ret) {
|
||||
SERVER_LOG(INFO, "tmp file does not exist or is locked by others", KR(ret), K(fd_arr_.at(fd_idx_)));
|
||||
ret = OB_SUCCESS;
|
||||
@ -111,7 +111,7 @@ int ObAllVirtualTmpFileInfo::process_curr_tenant(common::ObNewRow *&row)
|
||||
if (OB_UNLIKELY(!fd_arr_.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "unexpected fd_arr_", KR(ret), K(fd_arr_));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_fds(fd_arr_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_fds(MTL_ID(), fd_arr_))) {
|
||||
SERVER_LOG(WARN, "fail to get tmp file fd arr", KR(ret));
|
||||
if (OB_NOT_INIT == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -271,7 +271,7 @@ int ObDTLIntermResultManager::insert_interm_result_info(ObDTLIntermResultKey &ke
|
||||
// The code here is mainly for the use of the temp_table.
|
||||
// For the px module,
|
||||
// the dir_id has already been set in the previous access_mem_profile.
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(result_info->get_tenant_id(), dir_id_))) {
|
||||
LOG_WARN("allocate file directory failed", K(ret));
|
||||
} else {
|
||||
DTL_IR_STORE_DO(*result_info, set_dir_id, dir_id_);
|
||||
|
@ -243,7 +243,7 @@ int ObMergeGroupByOp::init_group_rows()
|
||||
int ObMergeGroupByOp::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(ctx_.get_my_session()->get_effective_tenant_id(), dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) {
|
||||
} else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) {
|
||||
|
@ -544,7 +544,7 @@ int ObMergeGroupByVecOp::init()
|
||||
int ret = OB_SUCCESS;
|
||||
group_rows_.set_tenant_id(MTL_ID());
|
||||
group_rows_.set_ctx_id(ObCtxIds::DEFAULT_CTX_ID);
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(ctx_.get_my_session()->get_effective_tenant_id(), dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) {
|
||||
} else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) {
|
||||
|
@ -27,9 +27,10 @@ OB_SERIALIZE_MEMBER((ObScalarAggregateSpec, ObGroupBySpec), enable_hash_base_dis
|
||||
int ObScalarAggregateOp::inner_open()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
|
||||
if (OB_FAIL(ObGroupByOp::inner_open())) {
|
||||
LOG_WARN("failed to inner_open", K(ret));
|
||||
} else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
} else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) {
|
||||
} else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) {
|
||||
@ -40,7 +41,7 @@ int ObScalarAggregateOp::inner_open()
|
||||
LOG_WARN("failed to init one group", K(ret));
|
||||
} else {
|
||||
bool need_dir_id = aggr_processor_.processor_need_alloc_dir_id();
|
||||
if (need_dir_id && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
if (need_dir_id && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (need_dir_id && FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) {
|
||||
} else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) {
|
||||
|
@ -27,7 +27,7 @@ int ObScalarAggregateVecOp::inner_open()
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObGroupByVecOp::inner_open())) {
|
||||
LOG_WARN("groupby inner open failed", K(ret));
|
||||
} else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
} else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(ctx_.get_my_session()->get_effective_tenant_id(), dir_id_))) {
|
||||
LOG_WARN("failed to allocate dir id", K(ret));
|
||||
} else if (OB_FAIL(init_mem_context())) {
|
||||
LOG_WARN("init memory context failed", K(ret));
|
||||
|
@ -570,7 +570,7 @@ ObChunkDatumStore::ObChunkDatumStore(const ObLabel &label, common::ObIAllocator
|
||||
}
|
||||
|
||||
int ObChunkDatumStore::init(int64_t mem_limit,
|
||||
uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */,
|
||||
const char *label /* = common::ObModIds::OB_SQL_CHUNK_ROW_STORE) */,
|
||||
bool enable_dump /* = true */,
|
||||
@ -602,7 +602,7 @@ void ObChunkDatumStore::reset()
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_file_open()) {
|
||||
aio_write_handle_.reset();
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(io_.fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(io_.fd));
|
||||
@ -2099,7 +2099,7 @@ int ObChunkDatumStore::get_timeout(int64_t &timeout_ms)
|
||||
int ObChunkDatumStore::alloc_dir_id()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(io_.dir_id_))) {
|
||||
if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id_, io_.dir_id_))) {
|
||||
LOG_WARN("allocate file directory failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -2122,7 +2122,7 @@ int ObChunkDatumStore::write_file(void *buf, int64_t size)
|
||||
if (-1 == io_.dir_id_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("temp file dir id is not init", K(ret), K(io_.dir_id_));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(io_.fd_, io_.dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, io_.fd_, io_.dir_id_))) {
|
||||
LOG_WARN("open file failed", K(ret));
|
||||
} else {
|
||||
file_size_ = 0;
|
||||
@ -2137,7 +2137,7 @@ int ObChunkDatumStore::write_file(void *buf, int64_t size)
|
||||
set_io(size, static_cast<char *>(buf));
|
||||
if (aio_write_handle_.is_valid() && OB_FAIL(aio_write_handle_.wait())) {
|
||||
LOG_WARN("failed to wait write", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_, aio_write_handle_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(tenant_id_, io_, aio_write_handle_))) {
|
||||
LOG_WARN("write to file failed", K(ret), K_(io), K(timeout_ms));
|
||||
}
|
||||
}
|
||||
@ -2186,9 +2186,9 @@ int ObChunkDatumStore::read_file(
|
||||
tmp_io.io_timeout_ms_ = timeout_ms;
|
||||
|
||||
if (0 == read_size
|
||||
&& OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_io.fd_, tmp_file_size))) {
|
||||
&& OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(tenant_id_, tmp_io.fd_, tmp_file_size))) {
|
||||
LOG_WARN("failed to get tmp file size", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(tmp_io, offset, handle))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, tmp_io, offset, handle))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("read form file failed", K(ret), K(tmp_io), K(offset), K(timeout_ms));
|
||||
}
|
||||
@ -2220,7 +2220,7 @@ int ObChunkDatumStore::aio_read_file(
|
||||
tmp_io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_READ);
|
||||
if (OB_FAIL(get_timeout(tmp_io.io_timeout_ms_))) {
|
||||
LOG_WARN("get timeout failed", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(tmp_io, offset, handle))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_pread(tenant_id_, tmp_io, offset, handle))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("read form file failed", K(ret), K(tmp_io), K(offset));
|
||||
}
|
||||
|
@ -969,7 +969,7 @@ public:
|
||||
virtual ~ObChunkDatumStore() { reset(); }
|
||||
|
||||
int init(int64_t mem_limit,
|
||||
uint64_t tenant_id = common::OB_SERVER_TENANT_ID,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID,
|
||||
const char *label = common::ObModIds::OB_SQL_CHUNK_ROW_STORE,
|
||||
bool enable_dump = true,
|
||||
|
@ -242,7 +242,7 @@ ObChunkRowStore::ObChunkRowStore(common::ObIAllocator *alloc /* = NULL */)
|
||||
}
|
||||
|
||||
int ObChunkRowStore::init(int64_t mem_limit,
|
||||
uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */,
|
||||
const char *label /* = common::ObNewModIds::OB_SQL_CHUNK_ROW_STORE) */,
|
||||
bool enable_dump /* = true */,
|
||||
@ -278,7 +278,6 @@ int ObChunkRowStore::init(int64_t mem_limit,
|
||||
void ObChunkRowStore::reset()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
tenant_id_ = common::OB_SERVER_TENANT_ID;
|
||||
label_ = common::ObModIds::OB_SQL_ROW_STORE;
|
||||
ctx_id_ = common::ObCtxIds::DEFAULT_CTX_ID;
|
||||
|
||||
@ -286,13 +285,14 @@ void ObChunkRowStore::reset()
|
||||
row_cnt_ = 0;
|
||||
|
||||
if (is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(io_.fd));
|
||||
} else {
|
||||
LOG_TRACE("close file success", K(ret), K_(io_.fd));
|
||||
}
|
||||
io_.fd_ = -1;
|
||||
}
|
||||
tenant_id_ = common::OB_SERVER_TENANT_ID;
|
||||
n_block_in_file_ = 0;
|
||||
|
||||
while (!blocks_.is_empty()) {
|
||||
@ -348,7 +348,7 @@ void ObChunkRowStore::reuse()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(io_.fd));
|
||||
} else {
|
||||
LOG_TRACE("close file success", K(ret), K_(io_.fd));
|
||||
@ -1598,7 +1598,7 @@ int ObChunkRowStore::get_timeout(int64_t &timeout_ms)
|
||||
int ObChunkRowStore::alloc_dir_id()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(io_.dir_id_))) {
|
||||
if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id_, io_.dir_id_))) {
|
||||
LOG_WARN("allocate file directory failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -1621,7 +1621,7 @@ int ObChunkRowStore::write_file(void *buf, int64_t size)
|
||||
if (-1 == io_.dir_id_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("temp file dir id is not init", K(ret), K(io_.dir_id_));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(io_.fd_, io_.dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, io_.fd_, io_.dir_id_))) {
|
||||
LOG_WARN("open file failed", K(ret));
|
||||
} else {
|
||||
file_size_ = 0;
|
||||
@ -1634,7 +1634,7 @@ int ObChunkRowStore::write_file(void *buf, int64_t size)
|
||||
}
|
||||
if (OB_SUCC(ret) && size > 0) {
|
||||
set_io(size, static_cast<char *>(buf));
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.write(tenant_id_, io_))) {
|
||||
LOG_WARN("write to file failed", K(ret), K_(io), K(timeout_ms));
|
||||
}
|
||||
}
|
||||
@ -1673,9 +1673,9 @@ int ObChunkRowStore::read_file(void *buf, const int64_t size, const int64_t offs
|
||||
io_.io_timeout_ms_ = timeout_ms;
|
||||
tmp_file::ObTmpFileIOHandle handle;
|
||||
if (0 == read_size
|
||||
&& OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(io_.fd_, tmp_file_size))) {
|
||||
&& OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(tenant_id_, io_.fd_, tmp_file_size))) {
|
||||
LOG_WARN("failed to get tmp file size", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_, offset, handle))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, io_, offset, handle))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("read form file failed", K(ret), K(io_), K(offset), K(timeout_ms));
|
||||
}
|
||||
@ -1701,11 +1701,11 @@ bool ObChunkRowStore::need_dump(int64_t extra_size)
|
||||
return dump;
|
||||
}
|
||||
|
||||
int ObChunkStoreUtil::alloc_dir_id(int64_t &dir_id)
|
||||
int ObChunkStoreUtil::alloc_dir_id(const uint64_t tenant_id, int64_t &dir_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
dir_id = 0;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id, dir_id))) {
|
||||
LOG_WARN("allocate file directory failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
|
@ -354,7 +354,7 @@ public:
|
||||
virtual ~ObChunkRowStore() { reset(); }
|
||||
|
||||
int init(int64_t mem_limit,
|
||||
uint64_t tenant_id = common::OB_SERVER_TENANT_ID,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID,
|
||||
const char *label = common::ObModIds::OB_SQL_CHUNK_ROW_STORE,
|
||||
bool enable_dump = true,
|
||||
@ -535,7 +535,7 @@ inline int ObChunkRowStore::BlockBuffer::advance(int64_t size)
|
||||
class ObChunkStoreUtil
|
||||
{
|
||||
public:
|
||||
static int alloc_dir_id(int64_t &dir_id);
|
||||
static int alloc_dir_id(const uint64_t tenant_id, int64_t &dir_id);
|
||||
};
|
||||
|
||||
} // end namespace sql
|
||||
|
@ -358,7 +358,7 @@ ObRADatumStore::ObRADatumStore(common::ObIAllocator *alloc /* = NULL */)
|
||||
}
|
||||
|
||||
int ObRADatumStore::init(int64_t mem_limit,
|
||||
uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */,
|
||||
const char *label /* = common::ObModIds::OB_SQL_ROW_STORE) */,
|
||||
uint32_t row_extend_size /* = 0 */)
|
||||
@ -398,7 +398,6 @@ void ObRADatumStore::inc_mem_hold(int64_t hold)
|
||||
void ObRADatumStore::reset()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
tenant_id_ = common::OB_SERVER_TENANT_ID;
|
||||
label_ = common::ObModIds::OB_SQL_ROW_STORE;
|
||||
ctx_id_ = common::ObCtxIds::DEFAULT_CTX_ID;
|
||||
mem_limit_ = 0;
|
||||
@ -415,7 +414,7 @@ void ObRADatumStore::reset()
|
||||
inner_reader_.reset();
|
||||
|
||||
if (is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(fd));
|
||||
@ -424,6 +423,7 @@ void ObRADatumStore::reset()
|
||||
dir_id_ = -1;
|
||||
file_size_ = 0;
|
||||
}
|
||||
tenant_id_ = common::OB_SERVER_TENANT_ID;
|
||||
|
||||
while (!blk_mem_list_.is_empty()) {
|
||||
LinkNode *node = blk_mem_list_.remove_first();
|
||||
@ -445,7 +445,7 @@ void ObRADatumStore::reuse()
|
||||
row_cnt_ = 0;
|
||||
inner_reader_.reset();
|
||||
if (is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(fd));
|
||||
@ -1253,9 +1253,9 @@ int ObRADatumStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||
LOG_WARN("get timeout failed", K(ret));
|
||||
} else {
|
||||
if (!is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id_, dir_id_))) {
|
||||
LOG_WARN("alloc file directory failed", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(fd_, dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, fd_, dir_id_))) {
|
||||
LOG_WARN("open file failed", K(ret));
|
||||
} else {
|
||||
file_size_ = 0;
|
||||
@ -1275,7 +1275,7 @@ int ObRADatumStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||
io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_WRITE);
|
||||
io.io_timeout_ms_ = timeout_ms;
|
||||
const uint64_t start = rdtsc();
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.write(tenant_id_, io))) {
|
||||
LOG_WARN("write to file failed", K(ret), K(io), K(timeout_ms));
|
||||
}
|
||||
if (NULL != io_observer_) {
|
||||
@ -1316,7 +1316,7 @@ int ObRADatumStore::read_file(void *buf, const int64_t size, const int64_t offse
|
||||
io.io_timeout_ms_ = timeout_ms;
|
||||
const uint64_t start = rdtsc();
|
||||
tmp_file::ObTmpFileIOHandle handle;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io, offset, handle))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, io, offset, handle))) {
|
||||
LOG_WARN("read form file failed", K(ret), K(io), K(offset), K(timeout_ms));
|
||||
} else if (OB_UNLIKELY(handle.get_done_size() != size)) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
|
@ -344,7 +344,7 @@ public:
|
||||
virtual ~ObRADatumStore() { reset(); }
|
||||
|
||||
int init(int64_t mem_limit,
|
||||
uint64_t tenant_id = common::OB_SERVER_TENANT_ID,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID,
|
||||
const char *label = common::ObModIds::OB_SQL_ROW_STORE,
|
||||
uint32_t row_extend_size = 0);
|
||||
|
@ -205,7 +205,7 @@ ObRARowStore::ObRARowStore(common::ObIAllocator *alloc /* = NULL */,
|
||||
}
|
||||
|
||||
int ObRARowStore::init(int64_t mem_limit,
|
||||
uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */,
|
||||
const char *label /* = common::ObNewModIds::OB_SQL_ROW_STORE) */)
|
||||
{
|
||||
@ -226,7 +226,6 @@ int ObRARowStore::init(int64_t mem_limit,
|
||||
void ObRARowStore::reset()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
tenant_id_ = common::OB_SERVER_TENANT_ID;
|
||||
label_ = common::ObModIds::OB_SQL_ROW_STORE;
|
||||
ctx_id_ = common::ObCtxIds::DEFAULT_CTX_ID;
|
||||
mem_limit_ = 0;
|
||||
@ -239,7 +238,7 @@ void ObRARowStore::reset()
|
||||
inner_reader_.reset();
|
||||
|
||||
if (is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(fd));
|
||||
@ -249,6 +248,7 @@ void ObRARowStore::reset()
|
||||
file_size_ = 0;
|
||||
}
|
||||
|
||||
tenant_id_ = common::OB_SERVER_TENANT_ID;
|
||||
while (!blk_mem_list_.is_empty()) {
|
||||
LinkNode *node = blk_mem_list_.remove_first();
|
||||
if (NULL != node) {
|
||||
@ -273,7 +273,7 @@ void ObRARowStore::reuse()
|
||||
row_cnt_ = 0;
|
||||
inner_reader_.reuse();
|
||||
if (is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(fd));
|
||||
@ -950,9 +950,9 @@ int ObRARowStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||
LOG_WARN("get timeout failed", K(ret));
|
||||
} else {
|
||||
if (!is_file_open()) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id_, dir_id_))) {
|
||||
LOG_WARN("alloc file directory failed", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(fd_, dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, fd_, dir_id_))) {
|
||||
LOG_WARN("open file failed", K(ret));
|
||||
} else {
|
||||
file_size_ = 0;
|
||||
@ -968,7 +968,7 @@ int ObRARowStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||
io.size_ = size;
|
||||
io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_WRITE);
|
||||
io.io_timeout_ms_ = timeout_ms;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.write(tenant_id_, io))) {
|
||||
LOG_WARN("write to file failed", K(ret), K(io), K(timeout_ms));
|
||||
}
|
||||
}
|
||||
@ -1003,7 +1003,7 @@ int ObRARowStore::read_file(void *buf, const int64_t size, const int64_t offset)
|
||||
io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_READ);
|
||||
io.io_timeout_ms_ = timeout_ms;
|
||||
tmp_file::ObTmpFileIOHandle handle;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io, offset, handle))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, io, offset, handle))) {
|
||||
LOG_WARN("read form file failed", K(ret), K(io), K(offset), K(timeout_ms));
|
||||
} else if (handle.get_done_size() != size) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
|
@ -219,7 +219,7 @@ public:
|
||||
virtual ~ObRARowStore() { reset(); }
|
||||
|
||||
int init(int64_t mem_limit,
|
||||
uint64_t tenant_id = common::OB_SERVER_TENANT_ID,
|
||||
uint64_t tenant_id,
|
||||
int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID,
|
||||
const char *label = common::ObModIds::OB_SQL_ROW_STORE);
|
||||
|
||||
|
@ -103,7 +103,7 @@ void ObTempBlockStore::reset()
|
||||
|
||||
if (is_file_open()) {
|
||||
write_io_handle_.reset();
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(io_.fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(io_.fd), K_(file_size));
|
||||
@ -126,7 +126,7 @@ void ObTempBlockStore::reuse()
|
||||
inner_reader_.reset();
|
||||
if (is_file_open()) {
|
||||
write_io_handle_.reset();
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) {
|
||||
LOG_WARN("remove file failed", K(ret), K_(io_.fd));
|
||||
} else {
|
||||
LOG_INFO("close file success", K(ret), K_(io_.fd), K_(file_size));
|
||||
@ -178,7 +178,7 @@ int ObTempBlockStore::alloc_dir_id()
|
||||
int ret = OB_SUCCESS;
|
||||
if (-1 == io_.dir_id_) {
|
||||
io_.dir_id_ = 0;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(io_.dir_id_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id_, io_.dir_id_))) {
|
||||
LOG_WARN("allocate file directory failed", K(ret));
|
||||
}
|
||||
}
|
||||
@ -1090,7 +1090,7 @@ int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||
if (!is_file_open()) {
|
||||
if (OB_FAIL(alloc_dir_id())) {
|
||||
LOG_WARN("alloc file directory failed", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(io_.fd_, io_.dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, io_.fd_, io_.dir_id_))) {
|
||||
LOG_WARN("open file failed", K(ret));
|
||||
} else {
|
||||
file_size_ = 0;
|
||||
@ -1107,7 +1107,7 @@ int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size)
|
||||
const uint64_t start = rdtsc();
|
||||
if (write_io_handle_.is_valid() && OB_FAIL(write_io_handle_.wait())) {
|
||||
LOG_WARN("fail to wait write", K(ret), K(write_io_handle_));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_, write_io_handle_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(tenant_id_, io_, write_io_handle_))) {
|
||||
LOG_WARN("write to file failed", K(ret), K_(io), K(timeout_ms));
|
||||
}
|
||||
if (NULL != io_observer_) {
|
||||
@ -1147,11 +1147,11 @@ int ObTempBlockStore::read_file(void *buf, const int64_t size, const int64_t off
|
||||
tmp_read_id.io_timeout_ms_ = timeout_ms;
|
||||
const uint64_t start = rdtsc();
|
||||
if (is_async) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(tmp_read_id, offset, handle))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_pread(tenant_id_, tmp_read_id, offset, handle))) {
|
||||
LOG_WARN("read form file failed", K(ret), K(tmp_read_id), K(offset), K(timeout_ms));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(tmp_read_id, offset, handle))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, tmp_read_id, offset, handle))) {
|
||||
LOG_WARN("read form file failed", K(ret), K(tmp_read_id), K(offset), K(timeout_ms));
|
||||
} else if (OB_UNLIKELY(handle.get_done_size() != size)) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
@ -1597,7 +1597,7 @@ int ObTempBlockStore::truncate_file(int64_t offset)
|
||||
if (!is_inited()) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.truncate(get_file_fd(), offset))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.truncate(tenant_id_, get_file_fd(), offset))) {
|
||||
LOG_WARN("truncate failed", K(ret), K(get_file_fd()), K(offset));
|
||||
}
|
||||
return ret;
|
||||
|
@ -316,7 +316,7 @@ int ObSqlMemMgrProcessor::alloc_dir_id(int64_t &dir_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (0 == dir_id_) {
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id_, dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
}
|
||||
}
|
||||
|
@ -1229,7 +1229,7 @@ int ObWindowFunctionOp::init()
|
||||
func_alloc.local_allocator_ = &local_allocator_;
|
||||
int64_t prev_pushdown_pby_col_count = -1;
|
||||
WFInfoFixedArray &wf_infos = *const_cast<WFInfoFixedArray *>(&MY_SPEC.wf_infos_);
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (OB_FAIL(curr_row_collect_values_.prepare_allocate(wf_infos.count()))) {
|
||||
LOG_WARN("cur row collect values prepare allocate failed", K(ret));
|
||||
|
@ -698,7 +698,7 @@ int ObWindowFunctionVecOp::init()
|
||||
all_part_exprs_.set_attr(attr);
|
||||
int prev_pushdown_pby_col_count = -1;
|
||||
WFInfoFixedArray &wf_infos = const_cast<WFInfoFixedArray &>(MY_SPEC.wf_infos_);
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) {
|
||||
if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) {
|
||||
LOG_WARN("failed to alloc dir id", K(ret));
|
||||
} else if (MY_SPEC.max_batch_size_ > 0) {
|
||||
if (OB_FAIL(all_expr_vector_copy_.init(child_->get_spec().output_, eval_ctx_))) {
|
||||
|
@ -2477,7 +2477,7 @@ int ObTabletFullDirectLoadMgr::open(const int64_t current_execution_id, share::S
|
||||
} else if (OB_UNLIKELY(!tablet_handle.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet handle is invalid", K(ret), K(tablet_handle));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(MTL_ID(), dir_id_))) {
|
||||
LOG_WARN("alloc dir id failed", K(ret));
|
||||
} else if (current_execution_id < execution_id_
|
||||
|| current_execution_id < tablet_handle.get_obj()->get_tablet_meta().ddl_execution_id_) {
|
||||
|
@ -195,8 +195,8 @@ int ObDirectLoadTmpFileIOHandle::open(const ObDirectLoadTmpFileHandle &file_hand
|
||||
if (OB_UNLIKELY(!file_handle.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(file_handle));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file->get_file_id().fd_,
|
||||
file_size))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(
|
||||
MTL_ID(), tmp_file->get_file_id().fd_, file_size))) {
|
||||
LOG_WARN("fail to get tmp file size", KR(ret), KPC(tmp_file));
|
||||
} else if (OB_UNLIKELY(file_size != tmp_file->get_file_size())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -251,7 +251,7 @@ int ObDirectLoadTmpFileIOHandle::pread(char *buf, int64_t size, int64_t offset)
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_info_, offset, file_io_handle_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(MTL_ID(), io_info_, offset, file_io_handle_))) {
|
||||
LOG_WARN("fail to do pread from tmp file", KR(ret), K_(io_info), K(offset));
|
||||
if (OB_LIKELY(is_retry_err(ret))) {
|
||||
if (++retry_cnt <= MAX_RETRY_CNT) {
|
||||
@ -290,14 +290,14 @@ int ObDirectLoadTmpFileIOHandle::write(char *buf, int64_t size)
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
}
|
||||
// TODO(suzhi.yt): 先保留原来的调用, aio_write提交成功就相当于写成功了
|
||||
else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) {
|
||||
else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(MTL_ID(), io_info_, file_io_handle_))) {
|
||||
LOG_WARN("fail to do aio write to tmp file", KR(ret), K_(io_info));
|
||||
if (OB_LIKELY(is_retry_err(ret))) {
|
||||
if (++retry_cnt <= MAX_RETRY_CNT) {
|
||||
ret = OB_SUCCESS;
|
||||
int64_t new_file_size = 0;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file_->get_file_id().fd_,
|
||||
new_file_size))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(
|
||||
MTL_ID(), tmp_file_->get_file_id().fd_, new_file_size))) {
|
||||
LOG_WARN("fail to get tmp file size", KR(ret), KPC_(tmp_file));
|
||||
} else {
|
||||
const int64_t write_size = new_file_size - tmp_file_->get_file_size();
|
||||
@ -342,7 +342,7 @@ int ObDirectLoadTmpFileIOHandle::aio_pread(char *buf, int64_t size, int64_t offs
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(check_status())) {
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(io_info_, offset, file_io_handle_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_pread(MTL_ID(), io_info_, offset, file_io_handle_))) {
|
||||
LOG_WARN("fail to do aio pread from tmp file", KR(ret), K_(io_info), K(offset));
|
||||
if (OB_LIKELY(is_retry_err(ret))) {
|
||||
if (++retry_cnt <= MAX_RETRY_CNT) {
|
||||
@ -381,14 +381,14 @@ int ObDirectLoadTmpFileIOHandle::aio_write(char *buf, int64_t size)
|
||||
LOG_WARN("fail to check status", KR(ret));
|
||||
}
|
||||
// aio_write提交成功就相当于写成功了
|
||||
else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) {
|
||||
else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(MTL_ID(), io_info_, file_io_handle_))) {
|
||||
LOG_WARN("fail to do aio write to tmp file", KR(ret), K_(io_info));
|
||||
if (OB_LIKELY(is_retry_err(ret))) {
|
||||
if (++retry_cnt <= MAX_RETRY_CNT) {
|
||||
ret = OB_SUCCESS;
|
||||
int64_t new_file_size = 0;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file_->get_file_id().fd_,
|
||||
new_file_size))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(
|
||||
MTL_ID(), tmp_file_->get_file_id().fd_, new_file_size))) {
|
||||
LOG_WARN("fail to get tmp file size", KR(ret), KPC_(tmp_file));
|
||||
} else {
|
||||
const int64_t write_size = new_file_size - tmp_file_->get_file_size();
|
||||
@ -478,7 +478,7 @@ int ObDirectLoadTmpFileManager::alloc_dir(int64_t &dir_id)
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDirectLoadTmpFileManager not init", KR(ret), KP(this));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(MTL_ID(), dir_id))) {
|
||||
LOG_WARN("fail to alloc dir", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -498,7 +498,7 @@ int ObDirectLoadTmpFileManager::alloc_file(int64_t dir_id,
|
||||
ObDirectLoadTmpFile *tmp_file = nullptr;
|
||||
ObDirectLoadTmpFileId file_id;
|
||||
file_id.dir_id_ = dir_id;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(file_id.fd_, file_id.dir_id_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(MTL_ID(), file_id.fd_, file_id.dir_id_))) {
|
||||
LOG_WARN("fail to open file", KR(ret));
|
||||
} else if (OB_ISNULL(tmp_file = file_allocator_.alloc(this, file_id))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -512,7 +512,7 @@ int ObDirectLoadTmpFileManager::alloc_file(int64_t dir_id,
|
||||
tmp_file = nullptr;
|
||||
}
|
||||
if (file_id.is_valid()) {
|
||||
FILE_MANAGER_INSTANCE_V2.remove(file_id.fd_);
|
||||
FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(MTL_ID(), file_id.fd_);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -531,7 +531,7 @@ void ObDirectLoadTmpFileManager::put_file(ObDirectLoadTmpFile *tmp_file)
|
||||
} else {
|
||||
const int64_t ref_count = tmp_file->get_ref_count();
|
||||
if (0 == ref_count) {
|
||||
FILE_MANAGER_INSTANCE_V2.remove(tmp_file->get_file_id().fd_);
|
||||
FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(MTL_ID(), tmp_file->get_file_id().fd_);
|
||||
file_allocator_.free(tmp_file);
|
||||
} else {
|
||||
LOG_ERROR("tmp file ref count must be zero", K(ref_count));
|
||||
|
@ -236,7 +236,7 @@ int ObFragmentWriterV2<T>::open(const int64_t buf_size, const int64_t expire_tim
|
||||
if (NULL == (buf_ = static_cast<char *>(allocator_.alloc(align_buf_size)))) {
|
||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "fail to allocate buffer", K(ret), K(align_buf_size));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(fd_, dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id, fd_, dir_id_))) {
|
||||
STORAGE_LOG(WARN, "fail to open file", K(ret));
|
||||
} else {
|
||||
buf_size_ = align_buf_size;
|
||||
@ -318,7 +318,7 @@ int ObFragmentWriterV2<T>::flush_buffer()
|
||||
io_info.buf_ = buf_;
|
||||
io_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_WRITE);
|
||||
io_info.io_timeout_ms_ = timeout_ms;
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info, file_io_handle_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(tenant_id_, io_info, file_io_handle_))) {
|
||||
STORAGE_LOG(WARN, "fail to do aio write macro file", K(ret), K(io_info));
|
||||
} else {
|
||||
macro_buffer_writer_.assign(ObExternalSortConstant::BUF_HEADER_LENGTH, buf_size_, buf_);
|
||||
@ -565,7 +565,7 @@ int ObFragmentReaderV2<T>::prefetch()
|
||||
io_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_READ);
|
||||
if (OB_FAIL(ObExternalSortConstant::get_io_timeout_ms(expire_timestamp_, io_info.io_timeout_ms_))) {
|
||||
STORAGE_LOG(WARN, "fail to get io timeout ms", K(ret), K(expire_timestamp_), K(io_info.io_timeout_ms_));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_read(io_info, file_io_handles_[handle_index]))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_read(tenant_id_, io_info, file_io_handles_[handle_index]))) {
|
||||
if (common::OB_ITER_END != ret) {
|
||||
STORAGE_LOG(WARN, "fail to do aio read from macro file", K(ret), K(fd_), K(io_info));
|
||||
} else {
|
||||
@ -684,7 +684,7 @@ int ObFragmentReaderV2<T>::clean_up()
|
||||
for (int64_t i = 0; i < MAX_HANDLE_COUNT; ++i) {
|
||||
file_io_handles_[i].reset();
|
||||
}
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) {
|
||||
STORAGE_LOG(WARN, "fail to remove macro file", K(ret));
|
||||
}
|
||||
reset();
|
||||
@ -1070,7 +1070,7 @@ int ObExternalSortRound<T, Compare>::init(
|
||||
ret = common::OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid argument", K(ret), K(merge_count), K(file_buf_size),
|
||||
KP(compare));
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) {
|
||||
} else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id, dir_id_))) {
|
||||
STORAGE_LOG(WARN, "fail to alloc dir", K(ret));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
|
@ -286,7 +286,7 @@ ObSharedNothingTmpFile::ObSharedNothingTmpFile()
|
||||
|
||||
ObSharedNothingTmpFile::~ObSharedNothingTmpFile()
|
||||
{
|
||||
destroy();
|
||||
reset();
|
||||
}
|
||||
|
||||
int ObSharedNothingTmpFile::init(const uint64_t tenant_id, const int64_t fd, const int64_t dir_id,
|
||||
@ -344,13 +344,13 @@ int ObSharedNothingTmpFile::init(const uint64_t tenant_id, const int64_t fd, con
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSharedNothingTmpFile::destroy()
|
||||
int ObSharedNothingTmpFile::release_resource()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t fd_backup = fd_;
|
||||
int64_t free_cnt = 0;
|
||||
|
||||
LOG_INFO("tmp file destroy start", KR(ret), K(fd_), KPC(this));
|
||||
LOG_INFO("tmp file release_resource start", KR(ret), K(fd_), KPC(this));
|
||||
|
||||
if (cached_page_nums_ > 0) {
|
||||
uint32_t cur_page_id = begin_page_id_;
|
||||
@ -360,7 +360,7 @@ int ObSharedNothingTmpFile::destroy()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("begin page virtual id is invalid", KR(ret), K(fd_), K(begin_page_virtual_id_));
|
||||
} else if (OB_FAIL(wbp_->free_page(fd_, cur_page_id, ObTmpFilePageUniqKey(begin_page_virtual_id_), next_page_id))) {
|
||||
LOG_WARN("fail to free page", KR(ret), K(fd_), K(cur_page_id), K(begin_page_virtual_id_));
|
||||
LOG_ERROR("fail to free page", KR(ret), K(fd_), K(cur_page_id), K(begin_page_virtual_id_));
|
||||
} else {
|
||||
free_cnt++;
|
||||
cur_page_id = next_page_id;
|
||||
@ -369,22 +369,16 @@ int ObSharedNothingTmpFile::destroy()
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && cached_page_nums_ != free_cnt) {
|
||||
LOG_ERROR("tmp file destroy, cached_page_nums_ and free_cnt are not equal", KR(ret), K(fd_), K(free_cnt), KPC(this));
|
||||
LOG_ERROR("tmp file release resource, cached_page_nums_ and free_cnt are not equal", KR(ret), K(fd_), K(free_cnt), KPC(this));
|
||||
}
|
||||
|
||||
LOG_INFO("tmp file destroy, free wbp page phase over", KR(ret), K(fd_), KPC(this));
|
||||
LOG_INFO("tmp file release resource, free wbp page phase over", KR(ret), K(fd_), KPC(this));
|
||||
|
||||
if (FAILEDx(meta_tree_.clear(truncated_offset_, file_size_))) {
|
||||
LOG_WARN("fail to clear", KR(ret), K(fd_), K(truncated_offset_), K(file_size_));
|
||||
LOG_ERROR("fail to clear meta tree", KR(ret), K(fd_), K(truncated_offset_), K(file_size_));
|
||||
}
|
||||
|
||||
LOG_INFO("tmp file destroy, meta_tree_ clear phase over", KR(ret), K(fd_), KPC(this));
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
reset();
|
||||
}
|
||||
|
||||
LOG_INFO("tmp file destroy over", KR(ret), "fd", fd_backup);
|
||||
LOG_INFO("tmp file release resource over", KR(ret), "fd", fd_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -216,7 +216,7 @@ public:
|
||||
ObIAllocator *wbp_index_cache_bkt_allocator,
|
||||
ObTmpFilePageCacheController *page_cache_controller,
|
||||
const char* label);
|
||||
int destroy();
|
||||
int release_resource();
|
||||
void reset();
|
||||
bool can_remove();
|
||||
bool is_deleting();
|
||||
|
@ -19,6 +19,9 @@ namespace oceanbase
|
||||
{
|
||||
namespace tmp_file
|
||||
{
|
||||
int64_t ObSNTenantTmpFileManager::current_fd_ = ObTmpFileGlobal::INVALID_TMP_FILE_FD;
|
||||
int64_t ObSNTenantTmpFileManager::current_dir_id_ = ObTmpFileGlobal::INVALID_TMP_FILE_DIR_ID;
|
||||
|
||||
ObSNTenantTmpFileManager::ObSNTenantTmpFileManager()
|
||||
: is_inited_(false),
|
||||
tenant_id_(OB_INVALID_TENANT_ID),
|
||||
@ -30,9 +33,7 @@ ObSNTenantTmpFileManager::ObSNTenantTmpFileManager()
|
||||
wbp_index_cache_bucket_allocator_(),
|
||||
files_(),
|
||||
tmp_file_block_manager_(),
|
||||
page_cache_controller_(tmp_file_block_manager_),
|
||||
current_fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD),
|
||||
current_dir_id_(ObTmpFileGlobal::INVALID_TMP_FILE_DIR_ID)
|
||||
page_cache_controller_(tmp_file_block_manager_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -111,18 +112,42 @@ void ObSNTenantTmpFileManager::wait()
|
||||
|
||||
void ObSNTenantTmpFileManager::destroy()
|
||||
{
|
||||
last_access_tenant_config_ts_ = -1;
|
||||
last_meta_mem_limit_ = META_DEFAULT_LIMIT;
|
||||
page_cache_controller_.destroy();
|
||||
files_.destroy();
|
||||
tmp_file_block_manager_.destroy();
|
||||
tmp_file_allocator_.reset();
|
||||
callback_allocator_.reset();
|
||||
wbp_index_cache_allocator_.reset();
|
||||
wbp_index_cache_bucket_allocator_.reset();
|
||||
is_inited_ = false;
|
||||
current_fd_ = ObTmpFileGlobal::INVALID_TMP_FILE_FD;
|
||||
current_dir_id_ = ObTmpFileGlobal::INVALID_TMP_FILE_DIR_ID;
|
||||
if (is_inited_) {
|
||||
is_inited_ = false;
|
||||
last_access_tenant_config_ts_ = -1;
|
||||
last_meta_mem_limit_ = META_DEFAULT_LIMIT;
|
||||
page_cache_controller_.destroy();
|
||||
int64_t curr_file_cnt = files_.count();
|
||||
if (OB_UNLIKELY(curr_file_cnt > 0)) {
|
||||
int ret = OB_SUCCESS;
|
||||
TmpFileMap::BlurredIterator iter(files_);
|
||||
while (OB_SUCC(ret)) {
|
||||
ObTmpFileKey unused_key(ObTmpFileGlobal::INVALID_TMP_FILE_FD);
|
||||
ObTmpFileHandle handle;
|
||||
if (OB_FAIL(iter.next(unused_key, handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("fail to get next tmp file", KR(ret), K(tenant_id_));
|
||||
}
|
||||
} else {
|
||||
LOG_ERROR("the tmp file has not been removed when tmp file mgr is destroying", KPC(handle.get()));
|
||||
}
|
||||
} // end while
|
||||
|
||||
int64_t new_file_cnt = files_.count();
|
||||
if (OB_UNLIKELY(new_file_cnt != curr_file_cnt)) {
|
||||
LOG_ERROR("there are some operation for tmp files when tmp file mgr is destroying", K(tenant_id_), K(curr_file_cnt));
|
||||
}
|
||||
}
|
||||
files_.destroy();
|
||||
tmp_file_block_manager_.destroy();
|
||||
tmp_file_allocator_.reset();
|
||||
callback_allocator_.reset();
|
||||
wbp_index_cache_allocator_.reset();
|
||||
wbp_index_cache_bucket_allocator_.reset();
|
||||
}
|
||||
|
||||
LOG_INFO("ObSNTenantTmpFileManager destroy", K(tenant_id_), KP(this));
|
||||
}
|
||||
@ -220,8 +245,12 @@ int ObSNTenantTmpFileManager::remove(const int64_t fd)
|
||||
usleep(100 * 1000); // 100ms
|
||||
}
|
||||
}
|
||||
tmp_file_handle.reset();
|
||||
tmp_file_allocator_.free(tmp_file);
|
||||
if (OB_FAIL(tmp_file->release_resource())) {
|
||||
LOG_ERROR("fail to release resource", KR(ret), KP(tmp_file), KPC(tmp_file), K(lbt()));
|
||||
} else {
|
||||
tmp_file_handle.reset();
|
||||
tmp_file_allocator_.free(tmp_file);
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("remove a tmp file over", KR(ret), K(start_remove_ts), K(fd), K(lbt()));
|
||||
|
@ -56,7 +56,7 @@ public:
|
||||
void destroy();
|
||||
|
||||
int alloc_dir(int64_t &dir_id);
|
||||
int open(int64_t &fd, const int64_t &dir_id, const char* const label = nullptr);
|
||||
int open(int64_t &fd, const int64_t &dir_id, const char* const label);
|
||||
int remove(const int64_t fd);
|
||||
|
||||
void refresh_meta_memory_limit();
|
||||
@ -116,8 +116,8 @@ private:
|
||||
ObTmpFileBlockManager tmp_file_block_manager_;
|
||||
ObTmpFilePageCacheController page_cache_controller_;
|
||||
|
||||
int64_t current_fd_;
|
||||
int64_t current_dir_id_;
|
||||
static int64_t current_fd_;
|
||||
static int64_t current_dir_id_;
|
||||
};
|
||||
|
||||
} // end namespace tmp_file
|
||||
|
@ -23,12 +23,37 @@ namespace tmp_file
|
||||
{
|
||||
void ObTmpFileEvictionManager::destroy()
|
||||
{
|
||||
// int ret = OB_SUCCESS;
|
||||
// ObSharedNothingTmpFile *file = nullptr;
|
||||
{
|
||||
ObSpinLockGuard guard(meta_list_lock_);
|
||||
// while (!file_meta_eviction_list_.is_empty()) {
|
||||
// ObTmpFileHandle file_handle;
|
||||
// if (OB_ISNULL(file = &file_meta_eviction_list_.remove_first()->file_)) {
|
||||
// ret = OB_ERR_UNEXPECTED;
|
||||
// LOG_WARN("file is null", KR(ret));
|
||||
// } else if (OB_FAIL(file_handle.init(file))) {
|
||||
// LOG_WARN("fail to init file handle", KR(ret), KP(file));
|
||||
// } else {
|
||||
// file->dec_ref_cnt();
|
||||
// }
|
||||
// }
|
||||
file_meta_eviction_list_.reset();
|
||||
}
|
||||
|
||||
{
|
||||
ObSpinLockGuard guard(data_list_lock_);
|
||||
// while (!file_data_eviction_list_.is_empty()) {
|
||||
// ObTmpFileHandle file_handle;
|
||||
// if (OB_ISNULL(file = &file_data_eviction_list_.remove_first()->file_)) {
|
||||
// ret = OB_ERR_UNEXPECTED;
|
||||
// LOG_WARN("file is null", KR(ret));
|
||||
// } else if (OB_FAIL(file_handle.init(file))) {
|
||||
// LOG_WARN("fail to init file handle", KR(ret), KP(file));
|
||||
// } else {
|
||||
// file->dec_ref_cnt();
|
||||
// }
|
||||
// }
|
||||
file_data_eviction_list_.reset();
|
||||
}
|
||||
}
|
||||
|
@ -42,13 +42,37 @@ int ObTmpFileFlushPriorityManager::init()
|
||||
|
||||
void ObTmpFileFlushPriorityManager::destroy()
|
||||
{
|
||||
// int ret = OB_SUCCESS;
|
||||
// ObSharedNothingTmpFile *file = nullptr;
|
||||
is_inited_ = false;
|
||||
for (int64_t i = 0; i < FileList::MAX; i++) {
|
||||
ObSpinLockGuard guard(data_list_locks_[i]);
|
||||
// while (!data_flush_lists_[i].is_empty()) {
|
||||
// ObTmpFileHandle file_handle;
|
||||
// if (OB_ISNULL(file = &data_flush_lists_[i].remove_first()->file_)) {
|
||||
// ret = OB_ERR_UNEXPECTED;
|
||||
// LOG_WARN("file is null", KR(ret));
|
||||
// } else if (OB_FAIL(file_handle.init(file))) {
|
||||
// LOG_WARN("fail to init file handle", KR(ret), KP(file));
|
||||
// } else {
|
||||
// file->dec_ref_cnt();
|
||||
// }
|
||||
// }
|
||||
data_flush_lists_[i].reset();
|
||||
}
|
||||
for (int64_t i = 0; i < FileList::MAX; i++) {
|
||||
ObSpinLockGuard guard(meta_list_locks_[i]);
|
||||
// while (!meta_flush_lists_[i].is_empty()) {
|
||||
// ObTmpFileHandle file_handle;
|
||||
// if (OB_ISNULL(file = &meta_flush_lists_[i].remove_first()->file_)) {
|
||||
// ret = OB_ERR_UNEXPECTED;
|
||||
// LOG_WARN("file is null", KR(ret));
|
||||
// } else if (OB_FAIL(file_handle.init(file))) {
|
||||
// LOG_WARN("fail to init file handle", KR(ret), KP(file));
|
||||
// } else {
|
||||
// file->dec_ref_cnt();
|
||||
// }
|
||||
// }
|
||||
meta_flush_lists_[i].reset();
|
||||
}
|
||||
}
|
||||
|
@ -30,19 +30,6 @@ int ObTenantTmpFileManager::mtl_init(ObTenantTmpFileManager *&manager)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTenantTmpFileManager &ObTenantTmpFileManager::get_instance()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenantTmpFileManager *tmp_file_manager = MTL(tmp_file::ObTenantTmpFileManager *);
|
||||
if (OB_ISNULL(tmp_file_manager)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObTenantTmpFileManager is null, please check whether MTL module is normal", KR(ret));
|
||||
ob_abort();
|
||||
}
|
||||
|
||||
return *tmp_file_manager;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManager::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -162,8 +149,8 @@ int ObTenantTmpFileManager::open(int64_t &fd, const int64_t &dir_id, const char*
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
if (OB_FAIL(sn_file_manager_.open(fd, dir_id))) {
|
||||
LOG_WARN("fail to open file in sn tmp file manager", KR(ret), K(fd), K(dir_id));
|
||||
if (OB_FAIL(sn_file_manager_.open(fd, dir_id, label))) {
|
||||
LOG_WARN("fail to open file in sn tmp file manager", KR(ret), K(fd), K(dir_id), KP(label));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -390,5 +377,338 @@ int ObTenantTmpFileManager::get_tmp_file_info(const int64_t fd, ObSNTmpFileInfo
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTenantTmpFileManagerWithMTLSwitch &ObTenantTmpFileManagerWithMTLSwitch::get_instance()
|
||||
{
|
||||
static ObTenantTmpFileManagerWithMTLSwitch mgr;
|
||||
|
||||
return mgr;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::alloc_dir(const uint64_t tenant_id, int64_t &dir_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->alloc_dir(dir_id))) {
|
||||
LOG_WARN("fail to alloc dir", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::open(const uint64_t tenant_id, int64_t &fd, const int64_t &dir_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->open(fd, dir_id))) {
|
||||
LOG_WARN("fail to open", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::remove(const uint64_t tenant_id, const int64_t fd)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->remove(fd))) {
|
||||
LOG_WARN("fail to remove", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::aio_read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->aio_read(io_info, io_handle))) {
|
||||
LOG_WARN("fail to aio read", KR(ret), K(tenant_id), K(io_info));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::aio_pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info,
|
||||
const int64_t offset, ObTmpFileIOHandle &io_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->aio_pread(io_info, offset, io_handle))) {
|
||||
LOG_WARN("fail to aio pread", KR(ret), K(tenant_id), K(io_info), K(offset));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->read(io_info, io_handle))) {
|
||||
LOG_WARN("fail to read", KR(ret), K(tenant_id), K(io_info));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, const int64_t offset, ObTmpFileIOHandle &io_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->pread(io_info, offset, io_handle))) {
|
||||
LOG_WARN("fail to pread", KR(ret), K(tenant_id), K(io_info), K(offset));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::aio_write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->aio_write(io_info, io_handle))) {
|
||||
LOG_WARN("fail to aio write", KR(ret), K(tenant_id), K(io_info));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->write(io_info))) {
|
||||
LOG_WARN("fail to write", KR(ret), K(tenant_id), K(io_info));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::truncate(const uint64_t tenant_id, const int64_t fd, const int64_t offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->truncate(fd, offset))) {
|
||||
LOG_WARN("fail to truncate", KR(ret), K(tenant_id), K(fd), K(offset));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::get_tmp_file_size(const uint64_t tenant_id, const int64_t fd, int64_t &file_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->get_tmp_file_size(fd, file_size))) {
|
||||
LOG_WARN("fail to get tmp file size", KR(ret), K(tenant_id), K(fd));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::get_tmp_file_fds(const uint64_t tenant_id, ObIArray<int64_t> &fd_arr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->get_tmp_file_fds(fd_arr))) {
|
||||
LOG_WARN("fail to get tmp file fds", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantTmpFileManagerWithMTLSwitch::get_tmp_file_info(const uint64_t tenant_id, const int64_t fd, ObSNTmpFileInfo &tmp_file_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (tenant_id != MTL_ID()) {
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_ISNULL(tmp_file_mgr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tmp_file_mgr->get_tmp_file_info(fd, tmp_file_info))) {
|
||||
LOG_WARN("fail to get tmp file info", KR(ret), K(tenant_id), K(fd));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end namespace tmp_file
|
||||
} // end namespace oceanbase
|
||||
|
@ -29,7 +29,6 @@ public:
|
||||
ObTenantTmpFileManager(): is_inited_(false) {}
|
||||
~ObTenantTmpFileManager() { destroy(); }
|
||||
static int mtl_init(ObTenantTmpFileManager *&manager);
|
||||
static ObTenantTmpFileManager &get_instance();
|
||||
ObSNTenantTmpFileManager &get_sn_file_manager() { return sn_file_manager_; }
|
||||
blocksstable::ObSSTenantTmpFileManager &get_ss_file_manager() { return ss_file_manager_; }
|
||||
int init();
|
||||
@ -65,7 +64,32 @@ private:
|
||||
blocksstable::ObSSTenantTmpFileManager ss_file_manager_;
|
||||
};
|
||||
|
||||
#define FILE_MANAGER_INSTANCE_V2 (::oceanbase::tmp_file::ObTenantTmpFileManager::get_instance())
|
||||
class ObTenantTmpFileManagerWithMTLSwitch final
|
||||
{
|
||||
public:
|
||||
static ObTenantTmpFileManagerWithMTLSwitch &get_instance();
|
||||
int alloc_dir(const uint64_t tenant_id, int64_t &dir_id);
|
||||
int open(const uint64_t tenant_id, int64_t &fd, const int64_t &dir_id);
|
||||
int remove(const uint64_t tenant_id, const int64_t fd);
|
||||
|
||||
public:
|
||||
int aio_read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle);
|
||||
int aio_pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, const int64_t offset, ObTmpFileIOHandle &io_handle);
|
||||
int read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle);
|
||||
int pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, const int64_t offset, ObTmpFileIOHandle &io_handle);
|
||||
// NOTE:
|
||||
// only support append write.
|
||||
int aio_write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle);
|
||||
// NOTE:
|
||||
// only support append write.
|
||||
int write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info);
|
||||
int truncate(const uint64_t tenant_id, const int64_t fd, const int64_t offset);
|
||||
int get_tmp_file_size(const uint64_t tenant_id, const int64_t fd, int64_t &file_size);
|
||||
int get_tmp_file_fds(const uint64_t tenant_id, ObIArray<int64_t> &fd_arr);
|
||||
int get_tmp_file_info(const uint64_t tenant_id, const int64_t fd, ObSNTmpFileInfo &tmp_file_info);
|
||||
};
|
||||
|
||||
#define FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH (::oceanbase::tmp_file::ObTenantTmpFileManagerWithMTLSwitch::get_instance())
|
||||
} // end namespace tmp_file
|
||||
} // end namespace oceanbase
|
||||
|
||||
|
@ -23,7 +23,7 @@ namespace oceanbase
|
||||
namespace tmp_file
|
||||
{
|
||||
ObTmpFileWBPIndexCache::ObTmpFileWBPIndexCache() :
|
||||
ObTmpFileCircleArray(), bucket_array_allocator_(), bucket_allocator_(),
|
||||
ObTmpFileCircleArray(), bucket_array_allocator_(nullptr), bucket_allocator_(nullptr),
|
||||
page_buckets_(nullptr) ,
|
||||
fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD), wbp_(nullptr),
|
||||
max_bucket_array_capacity_(MAX_BUCKET_ARRAY_CAPACITY) {}
|
||||
|
@ -906,9 +906,9 @@ TEST_F(TestChunkDatumStore, test_only_disk_data)
|
||||
int64_t rows = round * cnt;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkDatumStore rs("TEST");
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkDatumStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_mem_limit(1L << 30);
|
||||
// disk data
|
||||
CALL(append_rows, rs, cnt);
|
||||
@ -933,9 +933,9 @@ TEST_F(TestChunkDatumStore, test_only_disk_data1)
|
||||
int64_t rows = round * cnt;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkDatumStore rs("TEST");
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkDatumStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_mem_limit(1L << 30);
|
||||
// disk data
|
||||
CALL(append_rows, rs, cnt);
|
||||
@ -994,10 +994,11 @@ TEST_F(TestChunkDatumStore, test_append_block)
|
||||
|
||||
//recv
|
||||
ObChunkDatumStore rs2("TEST");
|
||||
rs2.alloc_dir_id();
|
||||
ObChunkDatumStore::Block *block2 = reinterpret_cast<ObChunkDatumStore::Block *>(mem2);
|
||||
ret = rs2.init(0, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = rs2.alloc_dir_id();
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < 100; ++i) {
|
||||
ret = rs2.append_block(block->get_buffer()->data(), block->get_buffer()->data_size(), true);
|
||||
}
|
||||
|
@ -210,13 +210,13 @@ public:
|
||||
{
|
||||
ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, 2 << 20);
|
||||
ObChunkRowStore rs(&alloc);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
int64_t v = 0;
|
||||
int64_t i;
|
||||
int64_t begin = ObTimeUtil::current_time();
|
||||
int ret = OB_SUCCESS;
|
||||
ret = rs.init(0, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_block_size(block_size);
|
||||
begin = ObTimeUtil::current_time();
|
||||
for (i = 0; i < rows; i++) {
|
||||
@ -337,10 +337,10 @@ TEST_F(TestChunkRowStore, multi_iter)
|
||||
int64_t i = 0;
|
||||
int64_t j = 0;
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
|
||||
ret = rs.init(1 << 20, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
|
||||
LOG_INFO("starting basic test: append 3000 rows");
|
||||
CALL(append_rows, rs, total); // approximate 1MB, no need to dump
|
||||
@ -380,9 +380,9 @@ TEST_F(TestChunkRowStore, multi_iter)
|
||||
TEST_F(TestChunkRowStore, keep_projector0)
|
||||
{
|
||||
ObChunkRowStore rs(NULL);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID,
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID,
|
||||
common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
const int64_t OBJ_CNT = 3;
|
||||
ObObj objs[OBJ_CNT];
|
||||
ObNewRow r;
|
||||
@ -438,10 +438,10 @@ TEST_F(TestChunkRowStore, keep_projector0)
|
||||
TEST_F(TestChunkRowStore, keep_projector2)
|
||||
{
|
||||
ObChunkRowStore rs(NULL);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID,
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID,
|
||||
common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true,
|
||||
ObChunkRowStore::STORE_MODE::FULL));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
const int64_t OBJ_CNT = 3;
|
||||
ObObj objs[OBJ_CNT];
|
||||
ObNewRow r;
|
||||
@ -495,15 +495,15 @@ TEST_F(TestChunkRowStore, keep_projector2)
|
||||
TEST_F(TestChunkRowStore, keep_projector2_with_copy)
|
||||
{
|
||||
ObChunkRowStore rs(NULL);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID,
|
||||
common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true,
|
||||
ObChunkRowStore::STORE_MODE::FULL));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID,
|
||||
common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true,
|
||||
ObChunkRowStore::STORE_MODE::FULL));
|
||||
ObChunkRowStore rs2(NULL);
|
||||
ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id());
|
||||
ASSERT_EQ(OB_SUCCESS, rs2.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID,
|
||||
ASSERT_EQ(OB_SUCCESS, rs2.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID,
|
||||
common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true,
|
||||
ObChunkRowStore::STORE_MODE::FULL));
|
||||
ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id());
|
||||
const int64_t OBJ_CNT = 3;
|
||||
ObObj objs[OBJ_CNT];
|
||||
ObNewRow r;
|
||||
@ -555,11 +555,11 @@ TEST_F(TestChunkRowStore, basic2)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
//mem limit 5M
|
||||
ret = rs.init(5L << 20, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
CALL(append_rows, rs, 100000);
|
||||
ASSERT_GT(rs.get_mem_hold(), 0);
|
||||
ASSERT_GT(rs.get_file_size(), 0);
|
||||
@ -577,12 +577,12 @@ TEST_F(TestChunkRowStore, chunk_iterator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::ChunkIterator chunk_it;
|
||||
ObChunkRowStore::RowIterator it;
|
||||
//mem limit 5M
|
||||
ret = rs.init(5L << 20, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
CALL(append_rows, rs, 100000);
|
||||
ASSERT_GT(rs.get_mem_hold(), 0);
|
||||
ASSERT_GT(rs.get_file_size(), 0);
|
||||
@ -636,13 +636,13 @@ TEST_F(TestChunkRowStore, test_copy_row)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rows = 1000;
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
const ObChunkRowStore::StoredRow *sr;
|
||||
LOG_INFO("starting mem_perf test: append rows", K(rows));
|
||||
int64_t begin = ObTimeUtil::current_time();
|
||||
ret = rs.init(0, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
CALL(append_rows, rs, rows);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.begin(it));
|
||||
ASSERT_EQ(OB_SUCCESS, it.get_next_row(sr));
|
||||
@ -653,12 +653,12 @@ TEST_F(TestChunkRowStore, mem_perf)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rows = 2000000;
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
LOG_INFO("starting mem_perf test: append rows", K(rows));
|
||||
int64_t begin = ObTimeUtil::current_time();
|
||||
ret = rs.init(0, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
CALL(append_rows, rs, rows);
|
||||
LOG_WARN("write time:", K(rows), K(ObTimeUtil::current_time() - begin));
|
||||
CALL(verify_n_rows, rs, it, 10000, true);
|
||||
@ -748,12 +748,12 @@ TEST_F(TestOARowStore, disk_time_cmp)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rows = 2000000;
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
int64_t v = 0;
|
||||
int64_t i;
|
||||
int64_t begin = ObTimeUtil::current_time();
|
||||
ret = rs.init(1 << 20, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
begin = ObTimeUtil::current_time();
|
||||
for (int64_t i = 0; i < rows; i++) {
|
||||
ObNewRow &row = gen_row(i);
|
||||
@ -812,9 +812,9 @@ TEST_F(TestChunkRowStore, disk)
|
||||
int64_t rows = round * 10000;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_mem_limit(100L << 20);
|
||||
for (int64_t i = 0; i < round; i++) {
|
||||
if (i == round / 2) {
|
||||
@ -849,9 +849,9 @@ TEST_F(TestChunkRowStore, disk_with_chunk)
|
||||
int64_t rows = round * cnt;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_mem_limit(1L << 20);
|
||||
for (int64_t i = 0; i < round; i++) {
|
||||
//if (i == round / 2) {
|
||||
@ -920,9 +920,9 @@ TEST_F(TestChunkRowStore, disk_with_chunk)
|
||||
// LOG_INFO("starting dump mem test: append rows", K(500000));
|
||||
// int ret = OB_SUCCESS;
|
||||
// ObChunkRowStore rs;
|
||||
// ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
// ObChunkRowStore::Iterator it;
|
||||
// ret = rs.init(0, tenant_id_, ctx_id_, mod_id_);
|
||||
// ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
// CALL(append_rows, rs, 500000);
|
||||
// int64_t avg_row_size = rs.get_mem_hold() / rs.get_row_cnt();
|
||||
// LOG_WARN("average row size", K(avg_row_size));
|
||||
@ -1005,7 +1005,6 @@ TEST_F(TestChunkRowStore, test_add_block)
|
||||
int ret = OB_SUCCESS;
|
||||
//send
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Block *block;
|
||||
ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, 2 << 20);
|
||||
|
||||
@ -1014,6 +1013,7 @@ TEST_F(TestChunkRowStore, test_add_block)
|
||||
|
||||
ret = rs.init(0, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
|
||||
int64_t min_size = rs.min_blk_size(row_size);
|
||||
void *mem = alloc.alloc(min_size);
|
||||
@ -1037,10 +1037,10 @@ TEST_F(TestChunkRowStore, test_add_block)
|
||||
|
||||
//recv
|
||||
ObChunkRowStore rs2;
|
||||
ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id());
|
||||
ObChunkRowStore::Block *block2 = reinterpret_cast<ObChunkRowStore::Block *>(mem2);
|
||||
ret = rs2.init(0, tenant_id_, ctx_id_, label_);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id());
|
||||
ret = rs2.add_block(block2, true);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
@ -1064,10 +1064,10 @@ TEST_F(TestChunkRowStore, row_extend_row)
|
||||
int64_t rows = round * 10000;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS,
|
||||
rs.init(0, tenant_id_, ctx_id_, label_, true, ObChunkRowStore::WITHOUT_PROJECTOR, 8));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
LOG_INFO("starting basic test: append 3000 rows");
|
||||
int64_t ret = OB_SUCCESS;
|
||||
int64_t base = rs.get_row_cnt();
|
||||
@ -1113,9 +1113,9 @@ TEST_F(TestChunkRowStore, test_both_disk_and_memory)
|
||||
int64_t rows = round * cnt;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_mem_limit(1L << 30);
|
||||
// disk data
|
||||
CALL(append_rows, rs, cnt);
|
||||
@ -1140,9 +1140,9 @@ TEST_F(TestChunkRowStore, test_only_disk_data)
|
||||
int64_t rows = round * cnt;
|
||||
LOG_INFO("starting write disk test: append rows", K(rows));
|
||||
ObChunkRowStore rs;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
ObChunkRowStore::Iterator it;
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id());
|
||||
rs.set_mem_limit(1L << 30);
|
||||
// disk data
|
||||
CALL(append_rows, rs, cnt);
|
||||
|
@ -24,7 +24,7 @@ using namespace common;
|
||||
TEST(RARowStore, keep_projector)
|
||||
{
|
||||
ObRARowStore rs(NULL, true);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SYS_TENANT_ID));
|
||||
const int64_t OBJ_CNT = 3;
|
||||
ObObj objs[OBJ_CNT];
|
||||
ObNewRow r;
|
||||
@ -80,7 +80,7 @@ TEST(RARowStore, alloc_project_fail)
|
||||
{
|
||||
ObEmptyAlloc alloc;
|
||||
ObRARowStore rs(&alloc, true);
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20));
|
||||
ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SYS_TENANT_ID));
|
||||
const int64_t OBJ_CNT = 3;
|
||||
ObObj objs[OBJ_CNT];
|
||||
ObNewRow r;
|
||||
|
@ -371,7 +371,7 @@ int TestParallelExternalSort::build_reader(const ObVector<TestItem *> &items, co
|
||||
ObFragmentWriterV2<TestItem> writer;
|
||||
int64_t dir_id = -1;
|
||||
std::sort(items.begin(), items.end(), compare);
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id))) {
|
||||
if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(OB_SYS_TENANT_ID, dir_id))) {
|
||||
COMMON_LOG(WARN, "fail to allocate file directory", K(ret));
|
||||
} else if (OB_FAIL(writer.open(buf_cap, expire_timestamp, OB_SYS_TENANT_ID, dir_id))) {
|
||||
COMMON_LOG(WARN, "fail to open writer", K(ret));
|
||||
@ -682,7 +682,7 @@ TEST_F(TestParallelExternalSort, test_writer)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t dir_id = -1;
|
||||
|
||||
ret = FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id);
|
||||
ret = FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(OB_SYS_TENANT_ID, dir_id);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
// single macro buffer, total write bytes is less than single macro buffer length
|
||||
ret = writer.open(buf_cap, expire_timestamp, OB_SYS_TENANT_ID, dir_id);
|
||||
@ -739,7 +739,7 @@ TEST_F(TestParallelExternalSort, test_reader)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t dir_id = -1;
|
||||
|
||||
ret = FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id);
|
||||
ret = FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(OB_SYS_TENANT_ID, dir_id);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
// single macro buffer, total write bytes is less than single macro buffer length
|
||||
ret = writer.open(buf_cap, expire_timestamp, OB_SYS_TENANT_ID, dir_id);
|
||||
|
Loading…
x
Reference in New Issue
Block a user