From 5fed34fcfe296e312653a3a30afffc5a13b9e854 Mon Sep 17 00:00:00 2001 From: stdpain <34912776+stdpain@users.noreply.github.com> Date: Wed, 12 May 2021 10:37:23 +0800 Subject: [PATCH] [optimize] provide a better defer operator (#5706) --- be/src/olap/schema_change.cpp | 67 +++++++++++-------- be/src/runtime/routine_load/data_consumer.cpp | 19 +++--- .../routine_load_task_executor.cpp | 5 +- be/src/runtime/sorted_run_merger.cc | 2 +- be/src/util/defer_op.h | 19 ++++-- be/src/util/file_utils.cpp | 8 +-- be/src/util/mysql_load_error_hub.cpp | 2 +- 7 files changed, 68 insertions(+), 54 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 43d3a2a113..e7388977c1 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -74,8 +74,8 @@ public: explicit RowBlockMerger(TabletSharedPtr tablet); virtual ~RowBlockMerger(); - bool merge(const std::vector& row_block_arr, RowsetWriter* rowset_writer, std::shared_ptr parent, - uint64_t* merged_rows); + bool merge(const std::vector& row_block_arr, RowsetWriter* rowset_writer, + std::shared_ptr parent, uint64_t* merged_rows); private: struct MergeElement { @@ -713,7 +713,8 @@ bool RowBlockSorter::sort(RowBlock** row_block) { return true; } -RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, std::shared_ptr parent, size_t memory_limitation) +RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, + std::shared_ptr parent, size_t memory_limitation) : _tablet_schema(tablet_schema), _mem_tracker(MemTracker::CreateTracker(-1, "RowBlockAllocator", parent, false)), _row_len(tablet_schema.row_size()), @@ -723,16 +724,18 @@ RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, std::sha RowBlockAllocator::~RowBlockAllocator() { if (_mem_tracker->consumption() != 0) { - LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size=" << _mem_tracker->consumption(); + LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size=" + << _mem_tracker->consumption(); } } OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool null_supported) { size_t row_block_size = _row_len * num_rows; - if (_memory_limitation > 0 && _mem_tracker->consumption() + row_block_size > _memory_limitation) { + if (_memory_limitation > 0 && + _mem_tracker->consumption() + row_block_size > _memory_limitation) { LOG(WARNING) << "RowBlockAllocator::alocate() memory exceeded. " - << "m_memory_allocated=" << _mem_tracker->consumption(); + << "m_memory_allocated=" << _mem_tracker->consumption(); *row_block = nullptr; return OLAP_SUCCESS; } @@ -751,7 +754,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo _mem_tracker->Consume(row_block_size); VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows - << ", m_memory_allocated=" << _mem_tracker->consumption() << ", row_block_addr=" << *row_block; + << ", m_memory_allocated=" << _mem_tracker->consumption() + << ", row_block_addr=" << *row_block; return OLAP_SUCCESS; } @@ -765,7 +769,8 @@ void RowBlockAllocator::release(RowBlock* row_block) { VLOG_NOTICE << "RowBlockAllocator::release() this=" << this << ", num_rows=" << row_block->capacity() - << ", m_memory_allocated=" << _mem_tracker->consumption() << ", row_block_addr=" << row_block; + << ", m_memory_allocated=" << _mem_tracker->consumption() + << ", row_block_addr=" << row_block; delete row_block; } @@ -773,11 +778,12 @@ RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {} RowBlockMerger::~RowBlockMerger() {} -bool RowBlockMerger::merge(const std::vector& row_block_arr, RowsetWriter* rowset_writer, std::shared_ptr parent, - uint64_t* merged_rows) { +bool RowBlockMerger::merge(const std::vector& row_block_arr, RowsetWriter* rowset_writer, + std::shared_ptr parent, uint64_t* merged_rows) { uint64_t tmp_merged_rows = 0; RowCursor row_cursor; - std::shared_ptr tracker(MemTracker::CreateTracker(-1, "RowBlockMerger", parent, false)); + std::shared_ptr tracker( + MemTracker::CreateTracker(-1, "RowBlockMerger", parent, false)); std::unique_ptr mem_pool(new MemPool(tracker.get())); std::unique_ptr agg_object_pool(new ObjectPool()); if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) { @@ -898,8 +904,12 @@ OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, return status; } -SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer, std::shared_ptr mem_tracker) - : SchemaChange(mem_tracker), _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {} +SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer, + std::shared_ptr mem_tracker) + : SchemaChange(mem_tracker), + _row_block_changer(row_block_changer), + _row_block_allocator(nullptr), + _cursor(nullptr) {} SchemaChangeDirectly::~SchemaChangeDirectly() { VLOG_NOTICE << "~SchemaChangeDirectly()"; @@ -920,7 +930,7 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBloc } OLAPStatus reserve_block(std::unique_ptr* block_handle_ptr, int row_num, - RowBlockAllocator* allocator) { + RowBlockAllocator* allocator) { auto& block_handle = *block_handle_ptr; if (block_handle == nullptr || block_handle->capacity() < row_num) { // release old block and alloc new block @@ -1010,7 +1020,7 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second, new_row_block.get(), &filtered_rows); RETURN_NOT_OK_LOG(res, "failed to change data in row block."); - + // rows filtered by delete handler one by one add_filtered_rows(filtered_rows); @@ -1053,7 +1063,8 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, return res; } -SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, std::shared_ptr mem_tracker, +SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, + std::shared_ptr mem_tracker, size_t memory_limitation) : SchemaChange(mem_tracker), _row_block_changer(row_block_changer), @@ -1078,8 +1089,8 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) { if (_row_block_allocator == nullptr) { - _row_block_allocator = - new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _mem_tracker, _memory_limitation); + _row_block_allocator = new (nothrow) + RowBlockAllocator(new_tablet->tablet_schema(), _mem_tracker, _memory_limitation); if (_row_block_allocator == nullptr) { LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << sizeof(RowBlockAllocator); return OLAP_ERR_INPUT_PARAMETER_ERROR; @@ -1114,7 +1125,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, // src_rowsets to store the rowset generated by internal sorting std::vector src_rowsets; - DeferOp defer([&]() { + Defer defer{[&]() { // remove the intermediate rowsets generated by internal sorting for (auto& row_set : src_rowsets) { StorageEngine::instance()->add_unused_rowset(row_set); @@ -1125,7 +1136,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, } row_block_arr.clear(); - }); + }}; _temp_delta_versions.first = _temp_delta_versions.second; @@ -1349,10 +1360,10 @@ bool SchemaChangeWithSorting::_external_sorting(vector& src_row return true; } -SchemaChangeHandler::SchemaChangeHandler() : _mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) { - REGISTER_HOOK_METRIC(schema_change_mem_consumption, [this]() { - return _mem_tracker->consumption(); - }); +SchemaChangeHandler::SchemaChangeHandler() + : _mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) { + REGISTER_HOOK_METRIC(schema_change_mem_consumption, + [this]() { return _mem_tracker->consumption(); }); } SchemaChangeHandler::~SchemaChangeHandler() { @@ -1644,8 +1655,8 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change; LOG(INFO) << "doing schema change with sorting for base_tablet " << base_tablet->full_name(); - sc_procedure = new (nothrow) - SchemaChangeWithSorting(rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024); + sc_procedure = new (nothrow) SchemaChangeWithSorting( + rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024); } else if (sc_directly) { LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name(); sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer, _mem_tracker); @@ -1852,8 +1863,8 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change; LOG(INFO) << "doing schema change with sorting for base_tablet " << sc_params.base_tablet->full_name(); - sc_procedure = new (nothrow) - SchemaChangeWithSorting(rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024); + sc_procedure = new (nothrow) SchemaChangeWithSorting( + rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 1024); } else if (sc_directly) { LOG(INFO) << "doing schema change directly for base_tablet " << sc_params.base_tablet->full_name(); diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 157f69abb4..301ad246b0 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -44,8 +44,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // conf has to be deleted finally - auto conf_deleter = [conf]() { delete conf; }; - DeferOp delete_conf(std::bind(conf_deleter)); + Defer delete_conf{[conf]() { delete conf; }}; std::stringstream ss; ss << BackendOptions::get_localhost() << "_"; @@ -146,11 +145,10 @@ Status KafkaDataConsumer::assign_topic_partitions( << " assign topic partitions: " << topic << ", " << ss.str(); // delete TopicPartition finally - auto tp_deleter = [&topic_partitions]() { + Defer delete_tp{[&topic_partitions]() { std::for_each(topic_partitions.begin(), topic_partitions.end(), [](RdKafka::TopicPartition* tp1) { delete tp1; }); - }; - DeferOp delete_tp(std::bind(tp_deleter)); + }}; // assign partition RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions); @@ -238,8 +236,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, Status KafkaDataConsumer::get_partition_meta(std::vector* partition_ids) { // create topic conf RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - auto conf_deleter = [tconf]() { delete tconf; }; - DeferOp delete_conf(std::bind(conf_deleter)); + Defer delete_conf{[tconf]() { delete tconf; }}; // create topic std::string errstr; @@ -250,8 +247,8 @@ Status KafkaDataConsumer::get_partition_meta(std::vector* partition_ids LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } - auto topic_deleter = [topic]() { delete topic; }; - DeferOp delete_topic(std::bind(topic_deleter)); + + Defer delete_topic{[topic]() { delete topic; }}; // get topic metadata RdKafka::Metadata* metadata = nullptr; @@ -263,8 +260,8 @@ Status KafkaDataConsumer::get_partition_meta(std::vector* partition_ids LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } - auto meta_deleter = [metadata]() { delete metadata; }; - DeferOp delete_meta(std::bind(meta_deleter)); + + Defer delete_meta{[metadata]() { delete metadata; }}; // get partition ids RdKafka::Metadata::TopicMetadataIterator it; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 6d40c9db1f..ed0abc970c 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -293,11 +293,10 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool _data_consumer_pool.return_consumer(consumer); // delete TopicPartition finally - auto tp_deleter = [&topic_partitions]() { + Defer delete_tp{[&topic_partitions]() { std::for_each(topic_partitions.begin(), topic_partitions.end(), [](RdKafka::TopicPartition* tp1) { delete tp1; }); - }; - DeferOp delete_tp(std::bind(tp_deleter)); + }}; } break; default: return; diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index 8b6f2839ec..de5805c072 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -188,7 +188,7 @@ private: // do merge from sender queue data _status_backup = _sorted_run(&_input_row_batch_backup); _backup_ready = true; - DeferOp defer_op([this]() { _batch_prepared_cv.notify_one(); }); + Defer defer_op{[this]() { _batch_prepared_cv.notify_one(); }}; if (!_status_backup.ok() || _input_row_batch_backup == nullptr || _cancel) { if (!_status_backup.ok()) _input_row_batch_backup = nullptr; diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h index 467ca9a921..544e151ef6 100644 --- a/be/src/util/defer_op.h +++ b/be/src/util/defer_op.h @@ -23,15 +23,22 @@ namespace doris { // This class is used to defer a function when this object is deconstruct -class DeferOp { +// A Better Defer operator #5576 +// for C++17 +// Defer defer {[]{ call something }}; +// +// for C++11 +// auto op = [] {}; +// Defer> (op); +template +class Defer { public: - typedef std::function DeferFunction; - DeferOp(const DeferFunction& func) : _func(func) {} - - ~DeferOp() { _func(); }; + Defer(T& closure) : _closure(closure) {} + Defer(T&& closure) : _closure(std::move(closure)) {} + ~Defer() { _closure(); } private: - DeferFunction _func; + T _closure; }; } // namespace doris diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp index 8825226682..fd934b5508 100644 --- a/be/src/util/file_utils.cpp +++ b/be/src/util/file_utils.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include "env/env.h" @@ -240,16 +241,15 @@ Status FileUtils::copy_file(const std::string& src_path, const std::string& dest } const int64_t BUF_SIZE = 8192; - char* buf = new char[BUF_SIZE]; - DeferOp free_buf(std::bind(std::default_delete(), buf)); + std::unique_ptr buf = std::make_unique(BUF_SIZE); int64_t src_length = src_file.length(); int64_t offset = 0; while (src_length > 0) { int64_t to_read = BUF_SIZE < src_length ? BUF_SIZE : src_length; - if (OLAP_SUCCESS != (src_file.pread(buf, to_read, offset))) { + if (OLAP_SUCCESS != (src_file.pread(buf.get(), to_read, offset))) { return Status::InternalError("Internal Error"); } - if (OLAP_SUCCESS != (dest_file.pwrite(buf, to_read, offset))) { + if (OLAP_SUCCESS != (dest_file.pwrite(buf.get(), to_read, offset))) { return Status::InternalError("Internal Error"); } diff --git a/be/src/util/mysql_load_error_hub.cpp b/be/src/util/mysql_load_error_hub.cpp index 17a387c714..920ceead2b 100644 --- a/be/src/util/mysql_load_error_hub.cpp +++ b/be/src/util/mysql_load_error_hub.cpp @@ -70,7 +70,7 @@ Status MysqlLoadErrorHub::write_mysql() { return st; } - DeferOp close_mysql_conn(std::bind(&mysql_close, my_conn)); + Defer close_mysql_conn{[=]() { mysql_close(my_conn); }}; Status status; std::stringstream sql_stream;