[CP] enable size > 2M lob direct load

This commit is contained in:
coolfishchen 2024-02-19 07:29:20 +00:00 committed by ob-robot
parent d4b1afb76e
commit eebca9309f
16 changed files with 241 additions and 171 deletions

View File

@ -823,6 +823,19 @@ int ObTmpTenantMacroBlockManager::free_macro_block(const int64_t block_id)
return ret;
}
int ObTmpTenantMacroBlockManager::get_disk_macro_block_count(int64_t &count) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpMacroBlockManager has not been inited", K(ret));
} else {
count = blocks_.size();
}
return ret;
}
int ObTmpTenantMacroBlockManager::get_disk_macro_block_list(
common::ObIArray<MacroBlockId> &macro_id_list)
@ -1511,6 +1524,19 @@ int ObTmpTenantFileStore::wait_write_finish(const int64_t block_id, const int64_
return ret;
}
int ObTmpTenantFileStore::get_disk_macro_block_count(int64_t &count) const
{
int ret = OB_SUCCESS;
SpinRLockGuard guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpTenantFileStore has not been inited", K(ret));
} else if (OB_FAIL(tmp_block_manager_.get_disk_macro_block_count(count))) {
STORAGE_LOG(WARN, "fail to get disk macro block count from tmp_block_manager_", K(ret));
}
return ret;
}
int ObTmpTenantFileStore::get_disk_macro_block_list(common::ObIArray<MacroBlockId> &macro_id_list)
{
int ret = OB_SUCCESS;
@ -1843,22 +1869,20 @@ int ObTmpFileStore::get_macro_block_list(ObIArray<TenantTmpBlockCntPair> &tmp_bl
STORAGE_LOG(WARN, "ObTmpFileStore has not been inited", K(ret));
} else {
tmp_block_cnt_pairs.reset();
common::ObSEArray<MacroBlockId, 64> macro_id_list;
macro_id_list.set_attr(ObMemAttr(MTL_ID(), "TMP_MB_LIST"));
TenantFileStoreMap::iterator iter;
ObTmpTenantFileStore *tmp = NULL;
for (iter = tenant_file_stores_.begin(); OB_SUCC(ret) && iter != tenant_file_stores_.end();
++iter) {
int64_t macro_id_count = 0;
TenantTmpBlockCntPair pair;
macro_id_list.reset();
if (OB_ISNULL(tmp = iter->second.get_tenant_store())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "fail to iterate tmp tenant file store", K(ret));
} else if (OB_FAIL(tmp->get_disk_macro_block_list(macro_id_list))){
} else if (OB_FAIL(tmp->get_disk_macro_block_count(macro_id_count))){
STORAGE_LOG(WARN, "fail to get list of tenant macro block in disk", K(ret));
} else if (OB_FAIL(pair.init(iter->first, macro_id_list.count()))) {
} else if (OB_FAIL(pair.init(iter->first, macro_id_count))) {
STORAGE_LOG(WARN, "fail to init tenant tmp block count pair", K(ret), "tenant id",
iter->first, "macro block count", macro_id_list.count());
iter->first, "macro block count", macro_id_count);
} else if (OB_FAIL(tmp_block_cnt_pairs.push_back(pair))) {
STORAGE_LOG(WARN, "fail to push back tmp_block_cnt_pairs", K(ret), K(pair));
}

View File

@ -223,6 +223,7 @@ public:
int alloc_macro_block(const int64_t dir_id, const uint64_t tenant_id, ObTmpMacroBlock *&t_mblk);
int free_macro_block(const int64_t block_id);
int get_macro_block(const int64_t block_id, ObTmpMacroBlock *&t_mblk);
int get_disk_macro_block_count(int64_t &count) const;
int get_disk_macro_block_list(common::ObIArray<MacroBlockId> &macro_id_list);
void print_block_usage();
@ -253,6 +254,7 @@ public:
void refresh_memory_limit(const uint64_t tenant_id);
int sync_block(const int64_t block_id, ObTmpTenantMemBlockManager::ObIOWaitInfoHandle &handle);
int wait_write_finish(const int64_t block_id, const int64_t timeout_ms);
int get_disk_macro_block_count(int64_t &count) const;
int get_disk_macro_block_list(common::ObIArray<MacroBlockId> &macro_id_list);
int get_macro_block(const int64_t block_id, ObTmpMacroBlock *&t_mblk);
// use io_allocator_ to allocate tenant extent memory.

View File

@ -43,11 +43,13 @@ public:
OB_INLINE int64_t get_end_pos() const { return buf_size_; }
TO_STRING_KV(K_(header), K_(compressor_type), KP_(compressor), KP_(buf), K_(buf_size), K_(pos),
KP_(decompress_buf), KP_(decompress_buf_size));
protected:
int realloc_decompress_buf(const int64_t size);
protected:
Header header_;
common::ObCompressorType compressor_type_;
common::ObCompressor *compressor_;
common::ObArenaAllocator allocator_;
int64_t data_block_size_;
char *buf_;
int64_t buf_size_;
int64_t pos_;
@ -61,7 +63,7 @@ template <typename Header>
ObDirectLoadDataBlockDecoder<Header>::ObDirectLoadDataBlockDecoder()
: compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
compressor_(nullptr),
allocator_("TLD_DBDecoder"),
data_block_size_(0),
buf_(nullptr),
buf_size_(0),
pos_(0),
@ -92,12 +94,15 @@ void ObDirectLoadDataBlockDecoder<Header>::reset()
header_.reset();
compressor_type_ = common::ObCompressorType::INVALID_COMPRESSOR;
compressor_ = nullptr;
data_block_size_ = 0;
buf_ = nullptr;
buf_size_ = 0;
pos_ = 0;
decompress_buf_ = nullptr;
if (decompress_buf_ != nullptr) {
ob_free(decompress_buf_);
decompress_buf_ = nullptr;
}
decompress_buf_size_ = 0;
allocator_.reset();
is_inited_ = false;
}
@ -114,28 +119,41 @@ int ObDirectLoadDataBlockDecoder<Header>::init(int64_t data_block_size,
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type));
} else {
allocator_.set_tenant_id(MTL_ID());
if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type) {
char *buf = nullptr;
if (OB_ISNULL(buf = static_cast<char *>(allocator_.alloc(data_block_size)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc buf", KR(ret), K(data_block_size));
} else if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type,
if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type,
compressor_))) {
STORAGE_LOG(WARN, "fail to get compressor, ", KR(ret), K(compressor_type));
} else {
decompress_buf_ = buf;
decompress_buf_size_ = data_block_size;
}
}
if (OB_SUCC(ret)) {
compressor_type_ = compressor_type;
data_block_size_ = data_block_size;
is_inited_ = true;
}
}
return ret;
}
template <typename Header>
int ObDirectLoadDataBlockDecoder<Header>::realloc_decompress_buf(const int64_t size)
{
int ret = OB_SUCCESS;
if (decompress_buf_size_ != size) {
if (decompress_buf_ != nullptr) {
ob_free(decompress_buf_);
decompress_buf_ = nullptr;
}
decompress_buf_ = (char *)ob_malloc(size, ObMemAttr(MTL_ID(), "TLD_DBDecoder"));
if (decompress_buf_ == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc mem", KR(ret), K(size));
} else {
decompress_buf_size_ = size;
}
}
return ret;
}
template <typename Header>
int ObDirectLoadDataBlockDecoder<Header>::prepare_data_block(char *buf, int64_t buf_size,
int64_t &data_size)
@ -175,7 +193,18 @@ int ObDirectLoadDataBlockDecoder<Header>::prepare_data_block(char *buf, int64_t
// do decompress
if (OB_SUCC(ret) && header_.occupy_size_ != header_.data_size_) {
int64_t decompress_size = 0;
if (OB_UNLIKELY(common::ObCompressorType::NONE_COMPRESSOR == compressor_type_)) {
if (header_.data_size_ > data_block_size_) {
if (OB_FAIL(realloc_decompress_buf(header_.data_size_))) {
STORAGE_LOG(WARN, "fail to realloc_decompress_buf", KR(ret));
}
} else {
if (OB_FAIL(realloc_decompress_buf(data_block_size_))) {
STORAGE_LOG(WARN, "fail to realloc_decompress_buf", KR(ret));
}
}
if (OB_FAIL(ret)) {
//pass
} else if (OB_UNLIKELY(common::ObCompressorType::NONE_COMPRESSOR == compressor_type_)) {
ret = common::OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected compressor type", KR(ret));
} else if (OB_FAIL(compressor_->decompress(buf + pos_, header_.occupy_size_ - pos_,

View File

@ -40,16 +40,16 @@ public:
int64_t get_pos() const { return pos_; }
Header &get_header() { return header_; }
int build_data_block(char *&buf, int64_t &buf_size);
template <typename T>
int build_data_block(const T &item, char *buf, int64_t buf_size, int64_t &data_size);
TO_STRING_KV(K_(header), K_(header_size), K_(compressor_type), KP_(compressor), KP_(buf),
K_(buf_size), K_(pos), KP_(compress_buf), K_(compress_buf_size));
protected:
int realloc_bufs(const int64_t size);
protected:
Header header_;
int64_t header_size_;
common::ObCompressorType compressor_type_;
common::ObCompressor *compressor_;
common::ObArenaAllocator allocator_;
int64_t data_block_size_;
char *buf_;
int64_t buf_size_; // buf capacity
int64_t pos_;
@ -64,7 +64,7 @@ ObDirectLoadDataBlockEncoder<Header>::ObDirectLoadDataBlockEncoder()
: header_size_(0),
compressor_type_(common::ObCompressorType::INVALID_COMPRESSOR),
compressor_(nullptr),
allocator_("TLD_DBEncoder"),
data_block_size_(0),
buf_(nullptr),
buf_size_(0),
pos_(0),
@ -94,15 +94,79 @@ void ObDirectLoadDataBlockEncoder<Header>::reset()
header_size_ = 0;
compressor_type_ = common::ObCompressorType::INVALID_COMPRESSOR;
compressor_ = nullptr;
buf_ = nullptr;
data_block_size_ = 0;
if (buf_ != nullptr) {
ob_free(buf_);
buf_ = nullptr;
}
buf_size_ = 0;
pos_ = 0;
compress_buf_ = nullptr;
if (compress_buf_ != nullptr) {
ob_free(compress_buf_);
compress_buf_ = nullptr;
}
compress_buf_size_ = 0;
allocator_.reset();
is_inited_ = false;
}
template <typename Header>
int ObDirectLoadDataBlockEncoder<Header>::realloc_bufs(const int64_t size)
{
int ret = OB_SUCCESS;
int64_t align_size = ALIGN_UP(size, DIO_ALIGN_SIZE);
if (buf_size_ != align_size) {
char *tmp_buf = (char *)ob_malloc(align_size, ObMemAttr(MTL_ID(), "TLD_DBEncoder"));
if (tmp_buf == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc buf", K(align_size), KR(ret));
}
if (OB_SUCC(ret) && buf_ != nullptr && pos_ > 0) {
if (pos_ > align_size) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pos is bigger than buf align_size", K(pos_), K(align_size), KR(ret));
} else {
MEMCPY(tmp_buf, buf_, pos_);
}
}
if (OB_SUCC(ret)) {
if (buf_ != nullptr) {
ob_free(buf_);
}
buf_ = tmp_buf;
buf_size_ = align_size;
}
}
if (compressor_ != nullptr) {
int64_t max_overflow_size = 0;
int64_t compress_buf_size = 0;
if (OB_SUCC(ret)) {
if (OB_FAIL(compressor_->get_max_overflow_size(size, max_overflow_size))) {
STORAGE_LOG(WARN, "fail to get max_overflow_size", KR(ret), K(size), K(max_overflow_size));
} else {
compress_buf_size = ALIGN_UP(size + max_overflow_size, DIO_ALIGN_SIZE);
}
}
if (OB_SUCC(ret) && compress_buf_size_ != compress_buf_size) {
if (compress_buf_ != nullptr) {
ob_free(compress_buf_);
compress_buf_ = nullptr;
}
compress_buf_ = (char *)ob_malloc(compress_buf_size, ObMemAttr(MTL_ID(), "TLD_DBEncoder"));
if (compress_buf_ == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc compress buf", K(compress_buf_size), KR(ret));
} else {
compress_buf_size_ = compress_buf_size;
}
}
}
return ret;
}
template <typename Header>
int ObDirectLoadDataBlockEncoder<Header>::init(int64_t data_block_size,
common::ObCompressorType compressor_type)
@ -116,28 +180,14 @@ int ObDirectLoadDataBlockEncoder<Header>::init(int64_t data_block_size,
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type));
} else {
allocator_.set_tenant_id(MTL_ID());
const int64_t alloc_buf_size =
data_block_size +
(compressor_type != common::ObCompressorType::NONE_COMPRESSOR ? data_block_size : 0);
char *buf = nullptr;
if (OB_ISNULL(buf = static_cast<char *>(allocator_.alloc(alloc_buf_size)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc buf", KR(ret), K(alloc_buf_size));
} else if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type &&
if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type &&
OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type,
compressor_))) {
STORAGE_LOG(WARN, "fail to get compressor, ", KR(ret), K(compressor_type));
} else {
header_size_ = header_.get_serialize_size();
compressor_type_ = compressor_type;
buf_ = buf;
buf_size_ = data_block_size;
pos_ = header_size_;
if (ObCompressorType::NONE_COMPRESSOR != compressor_type_) {
compress_buf_ = buf + data_block_size;
compress_buf_size_ = data_block_size;
}
data_block_size_ = data_block_size;
is_inited_ = true;
}
}
@ -150,12 +200,28 @@ int ObDirectLoadDataBlockEncoder<Header>::write_item(const T &item)
{
int ret = common::OB_SUCCESS;
const int64_t item_size = item.get_serialize_size();
if (item_size > buf_size_ - header_size_) {
ret = common::OB_SIZE_OVERFLOW;
} else if (item.get_serialize_size() + pos_ > buf_size_) {
ret = common::OB_BUF_NOT_ENOUGH;
} else if (OB_FAIL(item.serialize(buf_, buf_size_, pos_))) {
STORAGE_LOG(WARN, "fail to serialize item", KR(ret));
// 没有分配内存,和内存太大,都需要重新分配内存
if (item_size + pos_ < data_block_size_ || buf_size_ < data_block_size_) {
if (OB_FAIL(realloc_bufs(data_block_size_))) {
STORAGE_LOG(WARN, "fail to realloc bufs", KR(ret));
}
}
if (OB_SUCC(ret)) {
if (item_size > data_block_size_ - header_size_ && item_size > buf_size_ - header_size_) {
if (OB_FAIL(realloc_bufs(item_size + header_size_))) {
STORAGE_LOG(WARN, "fail to realloc bufs", KR(ret));
}
}
}
if (OB_SUCC(ret)) {
if (item_size + pos_ > buf_size_) {
ret = common::OB_BUF_NOT_ENOUGH;
} else if (OB_FAIL(item.serialize(buf_, buf_size_, pos_))) {
STORAGE_LOG(WARN, "fail to serialize item", KR(ret));
}
}
return ret;
}
@ -204,40 +270,5 @@ int ObDirectLoadDataBlockEncoder<Header>::build_data_block(char *&buf, int64_t &
return ret;
}
template <typename Header>
template <typename T>
int ObDirectLoadDataBlockEncoder<Header>::build_data_block(const T &item, char *buf,
int64_t buf_size, int64_t &data_size)
{
int ret = common::OB_SUCCESS;
if (IS_NOT_INIT) {
ret = common::OB_NOT_INIT;
STORAGE_LOG(WARN, "ObDirectLoadDataBlockEncoder not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid args", KR(ret), KP(buf), K(buf_size));
} else if (OB_UNLIKELY(item.get_serialize_size() + header_size_ > buf_size)) {
ret = common::OB_BUF_NOT_ENOUGH;
STORAGE_LOG(WARN, "buf not enough", KR(ret), K(buf_size), K(item.get_serialize_size()));
} else {
// serialize item
if (OB_FAIL(item.serialize(buf, buf_size, pos_))) {
STORAGE_LOG(WARN, "fail to serialize item", KR(ret));
}
// serialize header
else {
int64_t pos = 0;
data_size = pos_;
header_.data_size_ = data_size;
header_.occupy_size_ = data_size;
header_.checksum_ = ob_crc64_sse42(0, buf + header_size_, data_size - header_size_);
if (OB_FAIL(header_.serialize(buf, buf_size, pos))) {
STORAGE_LOG(WARN, "fail to serialize header", KR(ret));
}
}
}
return ret;
}
} // namespace storage
} // namespace oceanbase

View File

@ -30,7 +30,7 @@ public:
virtual ~ObDirectLoadDataBlockReader();
void reuse();
void reset();
int init(int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type);
int init(int64_t data_block_size, common::ObCompressorType compressor_type);
int open(const ObDirectLoadTmpFileHandle &file_handle, int64_t offset, int64_t size);
int get_next_item(const T *&item) override;
OB_INLINE int64_t get_block_count() const { return block_count_; }
@ -39,8 +39,8 @@ protected:
private:
int read_next_buffer();
int switch_next_block();
int realloc_buf(int64_t size);
protected:
common::ObArenaAllocator allocator_;
int64_t data_block_size_;
char *buf_;
int64_t buf_capacity_;
@ -60,8 +60,7 @@ protected:
template <typename Header, typename T>
ObDirectLoadDataBlockReader<Header, T>::ObDirectLoadDataBlockReader()
: allocator_("TLD_DBReader"),
data_block_size_(0),
: data_block_size_(0),
buf_(nullptr),
buf_capacity_(0),
buf_size_(0),
@ -99,12 +98,14 @@ template <typename Header, typename T>
void ObDirectLoadDataBlockReader<Header, T>::reset()
{
data_block_size_ = 0;
buf_ = nullptr;
if (buf_ != nullptr) {
ob_free(buf_);
buf_ = nullptr;
}
buf_capacity_ = 0;
buf_size_ = 0;
buf_pos_ = 0;
io_timeout_ms_ = 0;
allocator_.reset();
data_block_reader_.reset();
file_io_handle_.reset();
curr_item_.reset();
@ -116,7 +117,7 @@ void ObDirectLoadDataBlockReader<Header, T>::reset()
}
template <typename Header, typename T>
int ObDirectLoadDataBlockReader<Header, T>::init(int64_t data_block_size, int64_t buf_size,
int ObDirectLoadDataBlockReader<Header, T>::init(int64_t data_block_size,
common::ObCompressorType compressor_type)
{
int ret = common::OB_SUCCESS;
@ -124,21 +125,14 @@ int ObDirectLoadDataBlockReader<Header, T>::init(int64_t data_block_size, int64_
ret = common::OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObDirectLoadDataBlockReader init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(data_block_size <= 0 || data_block_size % DIO_ALIGN_SIZE != 0 ||
buf_size <= 0 || buf_size % DIO_ALIGN_SIZE != 0 ||
data_block_size > buf_size ||
compressor_type <= common::ObCompressorType::INVALID_COMPRESSOR)) {
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(buf_size), K(compressor_type));
STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type));
} else {
allocator_.set_tenant_id(MTL_ID());
if (OB_ISNULL(buf_ = static_cast<char *>(allocator_.alloc(buf_size)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to allocate memory", KR(ret), K(buf_size));
} else if (OB_FAIL(data_block_reader_.init(buf_size, compressor_type))) {
if (OB_FAIL(data_block_reader_.init(data_block_size, compressor_type))) {
STORAGE_LOG(WARN, "fail to init data block reader", KR(ret));
} else {
data_block_size_ = data_block_size;
buf_capacity_ = buf_size;
io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS);
is_inited_ = true;
}
@ -167,7 +161,7 @@ int ObDirectLoadDataBlockReader<Header, T>::open(const ObDirectLoadTmpFileHandle
if (OB_FAIL(file_io_handle_.open(file_handle))) {
STORAGE_LOG(WARN, "fail to open file handle", KR(ret));
} else if (OB_FAIL(switch_next_block())) {
STORAGE_LOG(WARN, "fail to switch next block", KR(ret));
STORAGE_LOG(WARN, "fail to switch next block", KR(ret), K(offset), K(size));
} else {
is_opened_ = true;
}
@ -202,31 +196,70 @@ int ObDirectLoadDataBlockReader<Header, T>::read_next_buffer()
return ret;
}
template <typename Header, typename T>
int ObDirectLoadDataBlockReader<Header, T>::realloc_buf(int64_t size)
{
int ret = OB_SUCCESS;
int64_t align_size = ALIGN_UP(size, DIO_ALIGN_SIZE);
if (buf_capacity_ != align_size && buf_size_ - buf_pos_ <= align_size) {
char *tmp_buf = (char *)ob_malloc(align_size, ObMemAttr(MTL_ID(), "TLD_DBReader"));
if (tmp_buf == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc mem", K(align_size), KR(ret));
} else {
if (buf_ != nullptr) {
MEMCPY(tmp_buf, buf_ + buf_pos_, buf_size_ - buf_pos_);
ob_free(buf_);
buf_ = nullptr;
}
}
if (OB_SUCC(ret)) {
buf_ = tmp_buf;
buf_size_ = buf_size_ - buf_pos_;
buf_pos_ = 0;
buf_capacity_ = align_size;
}
}
return ret;
}
template <typename Header, typename T>
int ObDirectLoadDataBlockReader<Header, T>::switch_next_block()
{
int ret = common::OB_SUCCESS;
int64_t data_size = 0;
if (buf_size_ - buf_pos_ < data_block_size_ && OB_FAIL(read_next_buffer())) {
if (OB_FAIL(realloc_buf(data_block_size_))) {
STORAGE_LOG(WARN, "fail to realloc buf", K(data_block_size_), KR(ret));
} else if (buf_size_ - buf_pos_ < DIO_ALIGN_SIZE && OB_FAIL(read_next_buffer())) {
if (OB_UNLIKELY(common::OB_ITER_END != ret)) {
STORAGE_LOG(WARN, "fail to read next buffer", KR(ret));
}
} else if (OB_FAIL(data_block_reader_.prepare_data_block(buf_ + buf_pos_, buf_size_ - buf_pos_,
data_size))) {
if (OB_UNLIKELY(common::OB_BUF_NOT_ENOUGH != ret)) {
STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_));
} else {
if (OB_FAIL(read_next_buffer())) {
ret = OB_SUCCESS;
if (data_size > buf_capacity_) {
if (OB_FAIL(realloc_buf(data_size))) {
STORAGE_LOG(WARN, "fail to alloc buf", KR(ret));
}
}
if (OB_FAIL(ret)) {
//pass
} else if (OB_FAIL(read_next_buffer())) {
STORAGE_LOG(WARN, "fail to read next buffer", KR(ret));
} else if (OB_FAIL(data_block_reader_.prepare_data_block(buf_ + buf_pos_,
buf_size_ - buf_pos_, data_size))) {
STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_));
STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_), K(data_size));
}
}
}
if (OB_SUCC(ret)) {
const int64_t data_block_size = ALIGN_UP(data_size, DIO_ALIGN_SIZE);
buf_pos_ += MAX(data_block_size_, data_block_size);
buf_pos_ += data_block_size;
++block_count_;
if (OB_FAIL(prepare_read_block())) {
STORAGE_LOG(WARN, "fail to prepare read block", KR(ret));

View File

@ -48,7 +48,6 @@ protected:
virtual int pre_write_item() { return common::OB_SUCCESS; }
virtual int pre_flush_buffer() { return common::OB_SUCCESS; }
int flush_buffer();
int flush_extra_buffer(const T &item);
protected:
int64_t data_block_size_;
char *extra_buf_;
@ -188,14 +187,6 @@ int ObDirectLoadDataBlockWriter<Header, T>::write_item(const T &item)
} else if (OB_FAIL(data_block_writer_.write_item(item))) {
STORAGE_LOG(WARN, "fail to write item", KR(ret));
}
} else if (common::OB_SIZE_OVERFLOW == ret && nullptr != extra_buf_) {
if (data_block_writer_.has_item() && OB_FAIL(flush_buffer())) {
STORAGE_LOG(WARN, "fail to flush buffer", KR(ret));
} else if (OB_FAIL(pre_write_item())) {
STORAGE_LOG(WARN, "fail to pre write item", KR(ret));
} else if (OB_FAIL(flush_extra_buffer(item))) {
STORAGE_LOG(WARN, "fail to flush extra buffer", KR(ret));
}
} else {
STORAGE_LOG(WARN, "fail to write item", KR(ret));
}
@ -214,48 +205,20 @@ int ObDirectLoadDataBlockWriter<Header, T>::flush_buffer()
} else {
char *buf = nullptr;
int64_t buf_size = 0;
int64_t align_buf_size = 0;
if (OB_FAIL(data_block_writer_.build_data_block(buf, buf_size))) {
STORAGE_LOG(WARN, "fail to build data block", KR(ret));
} else if (OB_FAIL(file_io_handle_.write(buf, data_block_size_))) {
} else if (FALSE_IT(align_buf_size = ALIGN_UP(buf_size, DIO_ALIGN_SIZE))) {
} else if (OB_FAIL(file_io_handle_.aio_write(buf, align_buf_size))) {
STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret));
} else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, data_block_size_, offset_))) {
} else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, align_buf_size, offset_))) {
STORAGE_LOG(WARN, "fail to callback write", KR(ret));
} else {
OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, data_block_size_);
OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, align_buf_size);
data_block_writer_.reuse();
offset_ += data_block_size_;
offset_ += align_buf_size;
++block_count_;
max_block_size_ = MAX(max_block_size_, data_block_size_);
}
}
return ret;
}
template <typename Header, typename T>
int ObDirectLoadDataBlockWriter<Header, T>::flush_extra_buffer(const T &item)
{
OB_TABLE_LOAD_STATISTICS_TIME_COST(INFO, external_flush_buffer_time_us);
int ret = common::OB_SUCCESS;
if (OB_FAIL(pre_flush_buffer())) {
STORAGE_LOG(WARN, "fail to pre flush buffer", KR(ret));
} else {
int64_t data_size = 0;
int64_t data_block_size = 0;
if (OB_FAIL(
data_block_writer_.build_data_block(item, extra_buf_, extra_buf_size_, data_size))) {
STORAGE_LOG(WARN, "fail to build data block", KR(ret));
} else if (FALSE_IT(data_block_size = ALIGN_UP(data_size, DIO_ALIGN_SIZE))) {
} else if (OB_FAIL(file_io_handle_.write(extra_buf_, data_block_size))) {
STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret));
} else if (nullptr != callback_ &&
OB_FAIL(callback_->write(extra_buf_, data_block_size, offset_))) {
STORAGE_LOG(WARN, "fail to callback write", KR(ret));
} else {
OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, data_block_size);
data_block_writer_.reuse();
offset_ += data_block_size;
++block_count_;
max_block_size_ = MAX(max_block_size_, data_block_size);
max_block_size_ = MAX(max_block_size_, align_buf_size);
}
}
return ret;

View File

@ -33,7 +33,7 @@ public:
virtual ~ObDirectLoadExternalSequentialScanner();
void reuse();
void reset();
int init(int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type,
int init(int64_t data_block_size, common::ObCompressorType compressor_type,
const ObDirectLoadExternalFragmentArray &fragments);
int get_next_item(const T *&item) override;
private:
@ -74,7 +74,7 @@ void ObDirectLoadExternalSequentialScanner<T>::reset()
template <typename T>
int ObDirectLoadExternalSequentialScanner<T>::init(
int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type,
int64_t data_block_size, common::ObCompressorType compressor_type,
const ObDirectLoadExternalFragmentArray &fragments)
{
int ret = common::OB_SUCCESS;
@ -82,16 +82,14 @@ int ObDirectLoadExternalSequentialScanner<T>::init(
ret = common::OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObDirectLoadExternalSequentialScanner init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(data_block_size <= 0 || data_block_size % DIO_ALIGN_SIZE != 0 ||
buf_size <= 0 || buf_size % DIO_ALIGN_SIZE != 0 ||
data_block_size > buf_size ||
compressor_type <= common::ObCompressorType::INVALID_COMPRESSOR ||
fragments.empty())) {
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", KR(ret), K(buf_size), K(compressor_type), K(fragments));
STORAGE_LOG(WARN, "invalid argument", KR(ret), K(compressor_type), K(fragments));
} else {
if (OB_FAIL(fragments_.assign(fragments))) {
STORAGE_LOG(WARN, "fail to assign fragments", KR(ret));
} else if (OB_FAIL(reader_.init(data_block_size, buf_size, compressor_type))) {
} else if (OB_FAIL(reader_.init(data_block_size, compressor_type))) {
STORAGE_LOG(WARN, "fail to init fragment reader", KR(ret));
} else if (OB_FAIL(switch_next_fragment())) {
STORAGE_LOG(WARN, "fail to switch next fragment", KR(ret));
@ -158,7 +156,7 @@ class ObDirectLoadExternalSortScanner : public ObDirectLoadExternalIterator<T>
public:
ObDirectLoadExternalSortScanner();
virtual ~ObDirectLoadExternalSortScanner();
int init(int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type,
int init(int64_t data_block_size, common::ObCompressorType compressor_type,
const ObDirectLoadExternalFragmentArray &fragments, Compare *compare);
int get_next_item(const T *&item) override;
void reuse();
@ -197,7 +195,7 @@ void ObDirectLoadExternalSortScanner<T, Compare>::reuse()
template <typename T, typename Compare>
int ObDirectLoadExternalSortScanner<T, Compare>::init(
int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type,
int64_t data_block_size, common::ObCompressorType compressor_type,
const ObDirectLoadExternalFragmentArray &fragments, Compare *compare)
{
int ret = common::OB_SUCCESS;
@ -205,12 +203,10 @@ int ObDirectLoadExternalSortScanner<T, Compare>::init(
ret = common::OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObDirectLoadExternalSortScanner init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(data_block_size <= 0 || data_block_size % DIO_ALIGN_SIZE != 0 ||
buf_size <= 0 || buf_size % DIO_ALIGN_SIZE != 0 ||
data_block_size > buf_size ||
compressor_type <= common::ObCompressorType::INVALID_COMPRESSOR ||
fragments.empty() || nullptr == compare)) {
ret = common::OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", KR(ret), K(buf_size), K(compressor_type), K(fragments),
STORAGE_LOG(WARN, "invalid argument", KR(ret), K(compressor_type), K(fragments),
KP(compare));
} else {
allocator_.set_tenant_id(MTL_ID());
@ -220,7 +216,7 @@ int ObDirectLoadExternalSortScanner<T, Compare>::init(
if (OB_ISNULL(reader = OB_NEWx(ExternalReader, (&allocator_)))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to new fragment reader", KR(ret));
} else if (OB_FAIL(reader->init(data_block_size, buf_size, compressor_type))) {
} else if (OB_FAIL(reader->init(data_block_size, compressor_type))) {
STORAGE_LOG(WARN, "fail to init fragment reader", KR(ret));
} else if (OB_FAIL(reader->open(fragment.file_handle_, 0, fragment.file_size_))) {
STORAGE_LOG(WARN, "fail to open fragment", KR(ret));

View File

@ -69,7 +69,6 @@ int ObDirectLoadMemLoader::work()
ObDirectLoadExternalFragment &fragment = fragments_.at(i);
ExternalReader external_reader;
if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_,
fragment.max_data_block_size_,
mem_ctx_->table_data_desc_.compressor_type_))) {
LOG_WARN("fail to init external reader", KR(ret));
} else if (OB_FAIL(external_reader.open(fragment.file_handle_, 0, fragment.file_size_))) {

View File

@ -30,7 +30,7 @@ ObDirectLoadMultipleHeapTableIndexBlockReader::~ObDirectLoadMultipleHeapTableInd
int ObDirectLoadMultipleHeapTableIndexBlockReader::init(int64_t data_block_size,
ObCompressorType compressor_type)
{
return ParentType::init(data_block_size, data_block_size, compressor_type);
return ParentType::init(data_block_size, compressor_type);
}
int ObDirectLoadMultipleHeapTableIndexBlockReader::get_next_index(

View File

@ -49,7 +49,6 @@ int ObDirectLoadMultipleHeapTableTabletWholeScanner::init(
if (OB_FAIL(index_scanner_.init(heap_table, tablet_id, table_data_desc))) {
LOG_WARN("fail to init index scanner", KR(ret));
} else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_,
heap_table->get_meta().max_data_block_size_,
table_data_desc.compressor_type_))) {
LOG_WARN("fail to init data block reader", KR(ret));
} else if (OB_FAIL(switch_next_fragment())) {

View File

@ -172,7 +172,6 @@ int ObDirectLoadMultipleHeapTableSorter::work()
const ObDirectLoadExternalFragment &fragment = fragments_.at(i);
ExternalReader external_reader;
if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_,
fragment.max_data_block_size_,
mem_ctx_->table_data_desc_.compressor_type_))) {
LOG_WARN("fail to init external reader", KR(ret));
} else if (OB_FAIL(external_reader.open(fragment.file_handle_, 0, fragment.file_size_))) {

View File

@ -63,7 +63,6 @@ int ObDirectLoadMultipleSSTableDataBlockScanner::init(
table_data_desc.compressor_type_))) {
LOG_WARN("fail to index block reader", KR(ret));
} else if (OB_FAIL(data_block_reader.init(table_data_desc.sstable_data_block_size_,
sstable->get_meta().max_data_block_size_,
table_data_desc.compressor_type_))) {
LOG_WARN("fail to data block reader", KR(ret));
} else if (OB_FAIL(locate_left_border(index_block_reader, data_block_reader))) {

View File

@ -62,7 +62,6 @@ int ObDirectLoadMultipleSSTableIndexBlockMetaWholeScanner::init(
table_data_desc.compressor_type_))) {
LOG_WARN("fail to index block reader", KR(ret));
} else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_,
sstable->get_meta().max_data_block_size_,
table_data_desc.compressor_type_))) {
LOG_WARN("fail to data block reader", KR(ret));
} else {
@ -148,7 +147,6 @@ int ObDirectLoadMultipleSSTableIndexBlockMetaTabletWholeScanner::init(
table_data_desc.compressor_type_))) {
LOG_WARN("fail to index block reader", KR(ret));
} else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_,
sstable->get_meta().max_data_block_size_,
table_data_desc.compressor_type_))) {
LOG_WARN("fail to data block reader", KR(ret));
} else if (OB_FAIL(locate_left_border(index_block_reader_, data_block_reader_))) {

View File

@ -60,7 +60,6 @@ int ObDirectLoadMultipleSSTableScanner::init(ObDirectLoadMultipleSSTable *sstabl
if (OB_FAIL(data_block_scanner_.init(sstable, table_data_desc, range, datum_utils))) {
LOG_WARN("fail to init data block scanner", KR(ret));
} else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_,
sstable->get_meta().max_data_block_size_,
table_data_desc.compressor_type_))) {
LOG_WARN("fail to init data block reader", KR(ret));
} else {

View File

@ -573,7 +573,6 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::init(
}
// init scanner_
else if (OB_FAIL(scanner_.init(merge_param.table_data_desc_.external_data_block_size_,
external_table->get_meta().max_data_block_size_,
merge_param.table_data_desc_.compressor_type_,
external_table->get_fragments()))) {
LOG_WARN("fail to init fragment scanner", KR(ret));

View File

@ -31,7 +31,7 @@ ObDirectLoadSSTableIndexBlockReader::~ObDirectLoadSSTableIndexBlockReader()
int ObDirectLoadSSTableIndexBlockReader::init(int64_t data_block_size,
ObCompressorType compressor_type)
{
return ParentType::init(data_block_size, data_block_size, compressor_type);
return ParentType::init(data_block_size, compressor_type);
}
int ObDirectLoadSSTableIndexBlockReader::get_next_entry(const ObDirectLoadSSTableIndexEntry *&entry)