[optimize] provide a better defer operator (#5706)

This commit is contained in:
stdpain
2021-05-12 10:37:23 +08:00
committed by GitHub
parent f4f0253e4f
commit 5fed34fcfe
7 changed files with 68 additions and 54 deletions

View File

@ -74,8 +74,8 @@ public:
explicit RowBlockMerger(TabletSharedPtr tablet);
virtual ~RowBlockMerger();
bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer, std::shared_ptr<MemTracker> parent,
uint64_t* merged_rows);
bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
std::shared_ptr<MemTracker> parent, uint64_t* merged_rows);
private:
struct MergeElement {
@ -713,7 +713,8 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
return true;
}
RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, std::shared_ptr<MemTracker> parent, size_t memory_limitation)
RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema,
std::shared_ptr<MemTracker> parent, size_t memory_limitation)
: _tablet_schema(tablet_schema),
_mem_tracker(MemTracker::CreateTracker(-1, "RowBlockAllocator", parent, false)),
_row_len(tablet_schema.row_size()),
@ -723,16 +724,18 @@ RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, std::sha
RowBlockAllocator::~RowBlockAllocator() {
if (_mem_tracker->consumption() != 0) {
LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size=" << _mem_tracker->consumption();
LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size="
<< _mem_tracker->consumption();
}
}
OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool null_supported) {
size_t row_block_size = _row_len * num_rows;
if (_memory_limitation > 0 && _mem_tracker->consumption() + row_block_size > _memory_limitation) {
if (_memory_limitation > 0 &&
_mem_tracker->consumption() + row_block_size > _memory_limitation) {
LOG(WARNING) << "RowBlockAllocator::alocate() memory exceeded. "
<< "m_memory_allocated=" << _mem_tracker->consumption();
<< "m_memory_allocated=" << _mem_tracker->consumption();
*row_block = nullptr;
return OLAP_SUCCESS;
}
@ -751,7 +754,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo
_mem_tracker->Consume(row_block_size);
VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows
<< ", m_memory_allocated=" << _mem_tracker->consumption() << ", row_block_addr=" << *row_block;
<< ", m_memory_allocated=" << _mem_tracker->consumption()
<< ", row_block_addr=" << *row_block;
return OLAP_SUCCESS;
}
@ -765,7 +769,8 @@ void RowBlockAllocator::release(RowBlock* row_block) {
VLOG_NOTICE << "RowBlockAllocator::release() this=" << this
<< ", num_rows=" << row_block->capacity()
<< ", m_memory_allocated=" << _mem_tracker->consumption() << ", row_block_addr=" << row_block;
<< ", m_memory_allocated=" << _mem_tracker->consumption()
<< ", row_block_addr=" << row_block;
delete row_block;
}
@ -773,11 +778,12 @@ RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {}
RowBlockMerger::~RowBlockMerger() {}
bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer, std::shared_ptr<MemTracker> parent,
uint64_t* merged_rows) {
bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
std::shared_ptr<MemTracker> parent, uint64_t* merged_rows) {
uint64_t tmp_merged_rows = 0;
RowCursor row_cursor;
std::shared_ptr<MemTracker> tracker(MemTracker::CreateTracker(-1, "RowBlockMerger", parent, false));
std::shared_ptr<MemTracker> tracker(
MemTracker::CreateTracker(-1, "RowBlockMerger", parent, false));
std::unique_ptr<MemPool> mem_pool(new MemPool(tracker.get()));
std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) {
@ -898,8 +904,12 @@ OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
return status;
}
SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer, std::shared_ptr<MemTracker> mem_tracker)
: SchemaChange(mem_tracker), _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {}
SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer,
std::shared_ptr<MemTracker> mem_tracker)
: SchemaChange(mem_tracker),
_row_block_changer(row_block_changer),
_row_block_allocator(nullptr),
_cursor(nullptr) {}
SchemaChangeDirectly::~SchemaChangeDirectly() {
VLOG_NOTICE << "~SchemaChangeDirectly()";
@ -920,7 +930,7 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBloc
}
OLAPStatus reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_ptr, int row_num,
RowBlockAllocator* allocator) {
RowBlockAllocator* allocator) {
auto& block_handle = *block_handle_ptr;
if (block_handle == nullptr || block_handle->capacity() < row_num) {
// release old block and alloc new block
@ -1010,7 +1020,7 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block.get(), &filtered_rows);
RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
// rows filtered by delete handler one by one
add_filtered_rows(filtered_rows);
@ -1053,7 +1063,8 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
return res;
}
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, std::shared_ptr<MemTracker> mem_tracker,
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
std::shared_ptr<MemTracker> mem_tracker,
size_t memory_limitation)
: SchemaChange(mem_tracker),
_row_block_changer(row_block_changer),
@ -1078,8 +1089,8 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet) {
if (_row_block_allocator == nullptr) {
_row_block_allocator =
new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _mem_tracker, _memory_limitation);
_row_block_allocator = new (nothrow)
RowBlockAllocator(new_tablet->tablet_schema(), _mem_tracker, _memory_limitation);
if (_row_block_allocator == nullptr) {
LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << sizeof(RowBlockAllocator);
return OLAP_ERR_INPUT_PARAMETER_ERROR;
@ -1114,7 +1125,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> src_rowsets;
DeferOp defer([&]() {
Defer defer{[&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : src_rowsets) {
StorageEngine::instance()->add_unused_rowset(row_set);
@ -1125,7 +1136,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
row_block_arr.clear();
});
}};
_temp_delta_versions.first = _temp_delta_versions.second;
@ -1349,10 +1360,10 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
return true;
}
SchemaChangeHandler::SchemaChangeHandler() : _mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) {
REGISTER_HOOK_METRIC(schema_change_mem_consumption, [this]() {
return _mem_tracker->consumption();
});
SchemaChangeHandler::SchemaChangeHandler()
: _mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) {
REGISTER_HOOK_METRIC(schema_change_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
}
SchemaChangeHandler::~SchemaChangeHandler() {
@ -1644,8 +1655,8 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
LOG(INFO) << "doing schema change with sorting for base_tablet "
<< base_tablet->full_name();
sc_procedure = new (nothrow)
SchemaChangeWithSorting(rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024);
sc_procedure = new (nothrow) SchemaChangeWithSorting(
rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name();
sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer, _mem_tracker);
@ -1852,8 +1863,8 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
LOG(INFO) << "doing schema change with sorting for base_tablet "
<< sc_params.base_tablet->full_name();
sc_procedure = new (nothrow)
SchemaChangeWithSorting(rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024);
sc_procedure = new (nothrow) SchemaChangeWithSorting(
rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet "
<< sc_params.base_tablet->full_name();

View File

@ -44,8 +44,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
// conf has to be deleted finally
auto conf_deleter = [conf]() { delete conf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));
Defer delete_conf{[conf]() { delete conf; }};
std::stringstream ss;
ss << BackendOptions::get_localhost() << "_";
@ -146,11 +145,10 @@ Status KafkaDataConsumer::assign_topic_partitions(
<< " assign topic partitions: " << topic << ", " << ss.str();
// delete TopicPartition finally
auto tp_deleter = [&topic_partitions]() {
Defer delete_tp{[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
};
DeferOp delete_tp(std::bind<void>(tp_deleter));
}};
// assign partition
RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
@ -238,8 +236,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
// create topic conf
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
auto conf_deleter = [tconf]() { delete tconf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));
Defer delete_conf{[tconf]() { delete tconf; }};
// create topic
std::string errstr;
@ -250,8 +247,8 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
auto topic_deleter = [topic]() { delete topic; };
DeferOp delete_topic(std::bind<void>(topic_deleter));
Defer delete_topic{[topic]() { delete topic; }};
// get topic metadata
RdKafka::Metadata* metadata = nullptr;
@ -263,8 +260,8 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
auto meta_deleter = [metadata]() { delete metadata; };
DeferOp delete_meta(std::bind<void>(meta_deleter));
Defer delete_meta{[metadata]() { delete metadata; }};
// get partition ids
RdKafka::Metadata::TopicMetadataIterator it;

View File

@ -293,11 +293,10 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
_data_consumer_pool.return_consumer(consumer);
// delete TopicPartition finally
auto tp_deleter = [&topic_partitions]() {
Defer delete_tp{[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
};
DeferOp delete_tp(std::bind<void>(tp_deleter));
}};
} break;
default:
return;

View File

@ -188,7 +188,7 @@ private:
// do merge from sender queue data
_status_backup = _sorted_run(&_input_row_batch_backup);
_backup_ready = true;
DeferOp defer_op([this]() { _batch_prepared_cv.notify_one(); });
Defer defer_op{[this]() { _batch_prepared_cv.notify_one(); }};
if (!_status_backup.ok() || _input_row_batch_backup == nullptr || _cancel) {
if (!_status_backup.ok()) _input_row_batch_backup = nullptr;

View File

@ -23,15 +23,22 @@
namespace doris {
// This class is used to defer a function when this object is deconstruct
class DeferOp {
// A Better Defer operator #5576
// for C++17
// Defer defer {[]{ call something }};
//
// for C++11
// auto op = [] {};
// Defer<decltype<op>> (op);
template <class T>
class Defer {
public:
typedef std::function<void()> DeferFunction;
DeferOp(const DeferFunction& func) : _func(func) {}
~DeferOp() { _func(); };
Defer(T& closure) : _closure(closure) {}
Defer(T&& closure) : _closure(std::move(closure)) {}
~Defer() { _closure(); }
private:
DeferFunction _func;
T _closure;
};
} // namespace doris

View File

@ -26,6 +26,7 @@
#include <algorithm>
#include <filesystem>
#include <iomanip>
#include <memory>
#include <sstream>
#include "env/env.h"
@ -240,16 +241,15 @@ Status FileUtils::copy_file(const std::string& src_path, const std::string& dest
}
const int64_t BUF_SIZE = 8192;
char* buf = new char[BUF_SIZE];
DeferOp free_buf(std::bind<void>(std::default_delete<char[]>(), buf));
std::unique_ptr<char[]> buf = std::make_unique<char[]>(BUF_SIZE);
int64_t src_length = src_file.length();
int64_t offset = 0;
while (src_length > 0) {
int64_t to_read = BUF_SIZE < src_length ? BUF_SIZE : src_length;
if (OLAP_SUCCESS != (src_file.pread(buf, to_read, offset))) {
if (OLAP_SUCCESS != (src_file.pread(buf.get(), to_read, offset))) {
return Status::InternalError("Internal Error");
}
if (OLAP_SUCCESS != (dest_file.pwrite(buf, to_read, offset))) {
if (OLAP_SUCCESS != (dest_file.pwrite(buf.get(), to_read, offset))) {
return Status::InternalError("Internal Error");
}

View File

@ -70,7 +70,7 @@ Status MysqlLoadErrorHub::write_mysql() {
return st;
}
DeferOp close_mysql_conn(std::bind<void>(&mysql_close, my_conn));
Defer close_mysql_conn{[=]() { mysql_close(my_conn); }};
Status status;
std::stringstream sql_stream;