[fix](inverted index) fix memory leaks in inverted index (#52747)
This commit is contained in:
103
be/src/olap/rowset/segment_v2/inverted_index_common.h
Normal file
103
be/src/olap/rowset/segment_v2/inverted_index_common.h
Normal file
@ -0,0 +1,103 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <CLucene.h> // IWYU pragma: keep
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "common/logging.h"
|
||||
|
||||
namespace lucene::store {
|
||||
class Directory;
|
||||
} // namespace lucene::store
|
||||
|
||||
namespace doris::segment_v2 {
|
||||
|
||||
struct DirectoryDeleter {
|
||||
void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); }
|
||||
};
|
||||
|
||||
struct ErrorContext {
|
||||
std::string err_msg;
|
||||
std::exception_ptr eptr;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
concept HasClose = requires(T t) {
|
||||
{ t->close() };
|
||||
};
|
||||
|
||||
template <typename PtrType>
|
||||
requires HasClose<PtrType>
|
||||
void finally_close(PtrType& resource, ErrorContext& error_context) {
|
||||
if (resource) {
|
||||
try {
|
||||
resource->close();
|
||||
} catch (CLuceneError& err) {
|
||||
error_context.eptr = std::current_exception();
|
||||
error_context.err_msg.append("Error occurred while closing resource: ");
|
||||
error_context.err_msg.append(err.what());
|
||||
LOG(ERROR) << error_context.err_msg;
|
||||
} catch (...) {
|
||||
error_context.eptr = std::current_exception();
|
||||
error_context.err_msg.append("Error occurred while closing resource");
|
||||
LOG(ERROR) << error_context.err_msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if defined(__clang__)
|
||||
#pragma clang diagnostic push
|
||||
#pragma clang diagnostic ignored "-Wunused-macros"
|
||||
#endif
|
||||
|
||||
#define FINALLY_CLOSE(resource) \
|
||||
{ \
|
||||
static_assert(sizeof(error_context) > 0, \
|
||||
"error_context must be defined before using FINALLY macro!"); \
|
||||
finally_close(resource, error_context); \
|
||||
}
|
||||
|
||||
// Return ERROR after finally
|
||||
#define FINALLY(finally_block) \
|
||||
{ \
|
||||
static_assert(sizeof(error_context) > 0, \
|
||||
"error_context must be defined before using FINALLY macro!"); \
|
||||
finally_block; \
|
||||
if (error_context.eptr) { \
|
||||
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(error_context.err_msg); \
|
||||
} \
|
||||
}
|
||||
|
||||
// Re-throw the exception after finally
|
||||
#define FINALLY_EXCEPTION(finally_block) \
|
||||
{ \
|
||||
static_assert(sizeof(error_context) > 0, \
|
||||
"error_context must be defined before using FINALLY macro!"); \
|
||||
finally_block; \
|
||||
if (error_context.eptr) { \
|
||||
std::rethrow_exception(error_context.eptr); \
|
||||
} \
|
||||
}
|
||||
|
||||
#if defined(__clang__)
|
||||
#pragma clang diagnostic pop
|
||||
#endif
|
||||
|
||||
} // namespace doris::segment_v2
|
||||
@ -114,9 +114,9 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
|
||||
int32_t read_buffer_size, bool open_idx_file_cache)
|
||||
: readBufferSize(read_buffer_size),
|
||||
dir(d),
|
||||
ram_dir(new lucene::store::RAMDirectory()),
|
||||
ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
|
||||
file_name(name),
|
||||
entries(_CLNEW EntriesType(true, true)) {
|
||||
entries(std::make_unique<EntriesType>(true, true)) {
|
||||
bool success = false;
|
||||
try {
|
||||
if (dir->fileLength(name) == 0) {
|
||||
@ -203,7 +203,6 @@ DorisCompoundReader::~DorisCompoundReader() {
|
||||
LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
|
||||
}
|
||||
}
|
||||
_CLDELETE(entries)
|
||||
}
|
||||
|
||||
const char* DorisCompoundReader::getClassName() {
|
||||
@ -314,7 +313,6 @@ void DorisCompoundReader::close() {
|
||||
}
|
||||
if (ram_dir) {
|
||||
ram_dir->close();
|
||||
_CLDELETE(ram_dir)
|
||||
}
|
||||
if (dir) {
|
||||
dir->close();
|
||||
|
||||
@ -68,12 +68,12 @@ private:
|
||||
int32_t readBufferSize;
|
||||
// base info
|
||||
lucene::store::Directory* dir = nullptr;
|
||||
lucene::store::RAMDirectory* ram_dir = nullptr;
|
||||
std::unique_ptr<lucene::store::RAMDirectory> ram_dir;
|
||||
std::string directory;
|
||||
std::string file_name;
|
||||
CL_NS(store)::IndexInput* stream = nullptr;
|
||||
// The life cycle of entries should be consistent with that of the DorisCompoundReader.
|
||||
EntriesType* entries = nullptr;
|
||||
std::unique_ptr<EntriesType> entries;
|
||||
std::mutex _this_lock;
|
||||
bool _closed = false;
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "io/fs/file_writer.h"
|
||||
#include "io/fs/local_file_system.h"
|
||||
#include "olap/rowset/segment_v2/inverted_index_cache.h"
|
||||
#include "olap/rowset/segment_v2/inverted_index_common.h"
|
||||
#include "olap/rowset/segment_v2/inverted_index_desc.h"
|
||||
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
|
||||
#include "olap/rowset/segment_v2/inverted_index_reader.h"
|
||||
@ -137,15 +138,14 @@ Status InvertedIndexFileWriter::close() {
|
||||
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
|
||||
for (const auto& entry : _indices_dirs) {
|
||||
const auto& dir = entry.second;
|
||||
auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get());
|
||||
DorisCompoundFileWriter cfsWriter(dir.get());
|
||||
// write compound file
|
||||
_file_size += cfsWriter->writeCompoundFile();
|
||||
_file_size += cfsWriter.writeCompoundFile();
|
||||
// delete index path, which contains separated inverted index files
|
||||
if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) {
|
||||
auto* compound_dir = static_cast<DorisFSDirectory*>(dir.get());
|
||||
compound_dir->deleteDirectory();
|
||||
}
|
||||
_CLDELETE(cfsWriter)
|
||||
}
|
||||
} else {
|
||||
_file_size = write();
|
||||
@ -337,50 +337,63 @@ size_t DorisCompoundFileWriter::writeCompoundFile() {
|
||||
ram_dir.close();
|
||||
|
||||
auto compound_fs = ((DorisFSDirectory*)directory)->getCompoundFileSystem();
|
||||
auto* out_dir = DorisFSDirectoryFactory::getDirectory(compound_fs, idx_path.c_str());
|
||||
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir;
|
||||
std::unique_ptr<lucene::store::IndexOutput> output;
|
||||
|
||||
auto* out = out_dir->createOutput(idx_name.c_str());
|
||||
if (out == nullptr) {
|
||||
LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr.";
|
||||
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
|
||||
}
|
||||
std::unique_ptr<lucene::store::IndexOutput> output(out);
|
||||
size_t start = output->getFilePointer();
|
||||
output->writeVInt(file_count);
|
||||
// write file entries
|
||||
int64_t data_offset = header_len;
|
||||
uint8_t header_buffer[buffer_length];
|
||||
for (int i = 0; i < sorted_files.size(); ++i) {
|
||||
auto file = sorted_files[i];
|
||||
output->writeString(file.filename); // FileName
|
||||
// DataOffset
|
||||
if (i < header_file_count) {
|
||||
// file data write in header, so we set its offset to -1.
|
||||
output->writeLong(-1);
|
||||
} else {
|
||||
output->writeLong(data_offset);
|
||||
ErrorContext error_context;
|
||||
size_t compound_file_size = 0;
|
||||
try {
|
||||
out_dir = std::unique_ptr<lucene::store::Directory, DirectoryDeleter>(
|
||||
DorisFSDirectoryFactory::getDirectory(compound_fs, idx_path.c_str()));
|
||||
output = std::unique_ptr<lucene::store::IndexOutput>(
|
||||
out_dir->createOutput(idx_name.c_str()));
|
||||
if (output == nullptr) {
|
||||
LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr.";
|
||||
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
|
||||
}
|
||||
output->writeLong(file.filesize); // FileLength
|
||||
if (i < header_file_count) {
|
||||
// append data
|
||||
copyFile(file.filename.c_str(), directory, output.get(), header_buffer, buffer_length);
|
||||
} else {
|
||||
data_offset += file.filesize;
|
||||
|
||||
size_t start = output->getFilePointer();
|
||||
output->writeVInt(file_count);
|
||||
// write file entries
|
||||
int64_t data_offset = header_len;
|
||||
uint8_t header_buffer[buffer_length];
|
||||
for (int i = 0; i < sorted_files.size(); ++i) {
|
||||
auto file = sorted_files[i];
|
||||
output->writeString(file.filename); // FileName
|
||||
// DataOffset
|
||||
if (i < header_file_count) {
|
||||
// file data write in header, so we set its offset to -1.
|
||||
output->writeLong(-1);
|
||||
} else {
|
||||
output->writeLong(data_offset);
|
||||
}
|
||||
output->writeLong(file.filesize); // FileLength
|
||||
if (i < header_file_count) {
|
||||
// append data
|
||||
copyFile(file.filename.c_str(), directory, output.get(), header_buffer,
|
||||
buffer_length);
|
||||
} else {
|
||||
data_offset += file.filesize;
|
||||
}
|
||||
}
|
||||
// write rest files' data
|
||||
uint8_t data_buffer[buffer_length];
|
||||
for (int i = header_file_count; i < sorted_files.size(); ++i) {
|
||||
auto file = sorted_files[i];
|
||||
copyFile(file.filename.c_str(), directory, output.get(), data_buffer, buffer_length);
|
||||
}
|
||||
|
||||
compound_file_size = output->getFilePointer() - start;
|
||||
} catch (CLuceneError& err) {
|
||||
error_context.eptr = std::current_exception();
|
||||
error_context.err_msg.append("writeCompoundFile exception, error msg: ");
|
||||
error_context.err_msg.append(err.what());
|
||||
LOG(ERROR) << error_context.err_msg;
|
||||
}
|
||||
// write rest files' data
|
||||
uint8_t data_buffer[buffer_length];
|
||||
for (int i = header_file_count; i < sorted_files.size(); ++i) {
|
||||
auto file = sorted_files[i];
|
||||
copyFile(file.filename.c_str(), directory, output.get(), data_buffer, buffer_length);
|
||||
}
|
||||
out_dir->close();
|
||||
// NOTE: need to decrease ref count, but not to delete here,
|
||||
// because index cache may get the same directory from DIRECTORIES
|
||||
_CLDECDELETE(out_dir)
|
||||
auto compound_file_size = output->getFilePointer() - start;
|
||||
output->close();
|
||||
//LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size;
|
||||
FINALLY_EXCEPTION({
|
||||
FINALLY_CLOSE(out_dir);
|
||||
FINALLY_CLOSE(output);
|
||||
})
|
||||
return compound_file_size;
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include "inverted_index_desc.h"
|
||||
#include "io/fs/file_reader.h"
|
||||
#include "io/fs/file_writer.h"
|
||||
#include "olap/rowset/segment_v2/inverted_index_common.h"
|
||||
#include "olap/tablet_schema.h"
|
||||
#include "util/debug_points.h"
|
||||
#include "util/slice.h"
|
||||
@ -516,14 +517,21 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) {
|
||||
assert(!exists);
|
||||
}
|
||||
auto* ret = _CLNEW FSIndexOutput();
|
||||
ErrorContext error_context;
|
||||
try {
|
||||
ret->init(fs, fl);
|
||||
} catch (CLuceneError& err) {
|
||||
ret->close();
|
||||
_CLDELETE(ret)
|
||||
LOG(WARNING) << "FSIndexOutput init error: " << err.what();
|
||||
_CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
|
||||
error_context.eptr = std::current_exception();
|
||||
error_context.err_msg.append("FSIndexOutput init error: ");
|
||||
error_context.err_msg.append(err.what());
|
||||
LOG(ERROR) << error_context.err_msg;
|
||||
}
|
||||
FINALLY_EXCEPTION({
|
||||
if (error_context.eptr) {
|
||||
FINALLY_CLOSE(ret);
|
||||
_CLDELETE(ret);
|
||||
}
|
||||
})
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user