diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index da5d220a24..c770e096ee 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -112,7 +112,6 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, DataDir::~DataDir() { DorisMetrics::instance()->metric_registry()->deregister_entity(_data_dir_metric_entity); - delete _id_generator; delete _meta; } diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 811118c1c9..75fde6dc06 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -195,7 +195,6 @@ private: static const uint32_t MAX_SHARD_NUM = 1024; OlapMeta* _meta = nullptr; - RowsetIdGenerator* _id_generator = nullptr; std::shared_ptr _data_dir_metric_entity; IntGauge* disks_total_capacity = nullptr; diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index 0e887e9639..d078d260f0 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -95,11 +95,6 @@ Status split_string(const std::string& base, const T separator, std::vector -void _destruct_object(const void* obj, void*) { - delete ((const T*)obj); -} - uint32_t olap_adler32_init(); uint32_t olap_adler32(uint32_t adler, const char* buf, size_t len); diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index e12e4e08c5..de95070418 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -26,19 +26,6 @@ namespace doris { -void NodeStatistics::merge(const NodeStatistics& other) { - peak_memory_bytes = std::max(other.peak_memory_bytes, peak_memory_bytes); -} - -void NodeStatistics::to_pb(PNodeStatistics* node_statistics) { - DCHECK(node_statistics != nullptr); - node_statistics->set_peak_memory_bytes(peak_memory_bytes); -} - -void NodeStatistics::from_pb(const PNodeStatistics& node_statistics) { - peak_memory_bytes = node_statistics.peak_memory_bytes(); -} - void QueryStatistics::merge(const QueryStatistics& other) { scan_rows += other.scan_rows.load(std::memory_order_relaxed); scan_bytes += other.scan_bytes.load(std::memory_order_relaxed); @@ -55,12 +42,6 @@ void QueryStatistics::merge(const QueryStatistics& other) { if (other_memory_used > 0) { this->current_used_memory_bytes = other_memory_used; } - - for (auto& other_node_statistics : other._nodes_statistics_map) { - int64_t node_id = other_node_statistics.first; - auto node_statistics = add_nodes_statistics(node_id); - node_statistics->merge(*other_node_statistics.second); - } } void QueryStatistics::to_pb(PQueryStatistics* statistics) { @@ -70,11 +51,6 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); statistics->set_returned_rows(returned_rows); statistics->set_max_peak_memory_bytes(max_peak_memory_bytes); - for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) { - auto node_statistics = statistics->add_nodes_statistics(); - node_statistics->set_node_id(iter->first); - iter->second->to_pb(node_statistics); - } } void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { @@ -94,21 +70,6 @@ void QueryStatistics::from_pb(const PQueryStatistics& statistics) { scan_rows = statistics.scan_rows(); scan_bytes = statistics.scan_bytes(); cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS; - for (auto& p_node_statistics : statistics.nodes_statistics()) { - int64_t node_id = p_node_statistics.node_id(); - auto node_statistics = add_nodes_statistics(node_id); - node_statistics->from_pb(p_node_statistics); - } -} - -int64_t QueryStatistics::calculate_max_peak_memory_bytes() { - int64_t max_peak_memory = 0; - for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) { - if (max_peak_memory < iter->second->peak_memory_bytes) { - max_peak_memory = iter->second->peak_memory_bytes; - } - } - return max_peak_memory; } void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { @@ -123,16 +84,7 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) { } } -void QueryStatistics::clearNodeStatistics() { - for (auto& pair : _nodes_statistics_map) { - delete pair.second; - } - _nodes_statistics_map.clear(); -} - -QueryStatistics::~QueryStatistics() { - clearNodeStatistics(); -} +QueryStatistics::~QueryStatistics() {} void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) { std::lock_guard l(_lock); diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index 8a18a152e4..a9f6e192ec 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -35,25 +35,6 @@ class QueryStatisticsRecvr; class PNodeStatistics; class PQueryStatistics; -class NodeStatistics { -public: - NodeStatistics() : peak_memory_bytes(0) {} - - void set_peak_memory(int64_t peak_memory) { - this->peak_memory_bytes = std::max(this->peak_memory_bytes, peak_memory); - } - - void merge(const NodeStatistics& other); - - void to_pb(PNodeStatistics* node_statistics); - - void from_pb(const PNodeStatistics& node_statistics); - -private: - friend class QueryStatistics; - int64_t peak_memory_bytes; -}; - // This is responsible for collecting query statistics, usually it consists of // two parts, one is current fragment or plan's statistics, the other is sub fragment // or plan's statistics and QueryStatisticsRecvr is responsible for collecting it. @@ -92,18 +73,6 @@ public: this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed); } - NodeStatistics* add_nodes_statistics(int64_t node_id) { - NodeStatistics* nodeStatistics = nullptr; - auto iter = _nodes_statistics_map.find(node_id); - if (iter == _nodes_statistics_map.end()) { - nodeStatistics = new NodeStatistics; - _nodes_statistics_map[node_id] = nodeStatistics; - } else { - nodeStatistics = iter->second; - } - return nodeStatistics; - } - void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; } void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { @@ -117,8 +86,6 @@ public: void merge(QueryStatisticsRecvr* recvr); void merge(QueryStatisticsRecvr* recvr, int sender_id); - // Get the maximum value from the peak memory collected by all node statistics - int64_t calculate_max_peak_memory_bytes(); void clearNodeStatistics(); @@ -159,9 +126,6 @@ private: // Maximum memory peak for all backends. // only set once by result sink when closing. std::atomic max_peak_memory_bytes; - // The statistics of the query on each backend. - using NodeStatisticsMap = std::unordered_map; - NodeStatisticsMap _nodes_statistics_map; bool _collected = false; std::atomic current_used_memory_bytes; diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h index a8753a473d..80a469596f 100644 --- a/be/src/vec/sink/autoinc_buffer.h +++ b/be/src/vec/sink/autoinc_buffer.h @@ -19,6 +19,7 @@ #include #include "common/config.h" +#include "common/factory_creator.h" #include "common/status.h" #include "util/threadpool.h" @@ -52,6 +53,7 @@ struct AutoIncIDAllocator { }; class AutoIncIDBuffer { + ENABLE_FACTORY_CREATOR(AutoIncIDBuffer); // GenericReader::_MIN_BATCH_SIZE = 4064 static constexpr size_t MIN_BATCH_SIZE = 4064; @@ -101,29 +103,27 @@ public: .set_max_queue_size(std::numeric_limits::max()) .build(&_fetch_autoinc_id_executor)); } - ~GlobalAutoIncBuffers() { - for (auto [_, buffer] : _buffers) { - delete buffer; - } - } + ~GlobalAutoIncBuffers() = default; std::unique_ptr create_token() { return _fetch_autoinc_id_executor->new_token(ThreadPool::ExecutionMode::CONCURRENT); } - AutoIncIDBuffer* get_auto_inc_buffer(int64_t db_id, int64_t table_id, int64_t column_id) { + std::shared_ptr get_auto_inc_buffer(int64_t db_id, int64_t table_id, + int64_t column_id) { std::lock_guard lock(_mutex); auto key = std::make_tuple(db_id, table_id, column_id); auto it = _buffers.find(key); if (it == _buffers.end()) { - _buffers.emplace(std::make_pair(key, new AutoIncIDBuffer(db_id, table_id, column_id))); + _buffers.emplace(std::make_pair( + key, AutoIncIDBuffer::create_shared(db_id, table_id, column_id))); } return _buffers[{db_id, table_id, column_id}]; } private: std::unique_ptr _fetch_autoinc_id_executor; - std::map, AutoIncIDBuffer*> _buffers; + std::map, std::shared_ptr> _buffers; std::mutex _mutex; }; diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 963ebd7d58..4eaaef3869 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -103,7 +103,7 @@ private: size_t _batch_size; std::optional _auto_inc_col_idx; - AutoIncIDBuffer* _auto_inc_id_buffer = nullptr; + std::shared_ptr _auto_inc_id_buffer = nullptr; AutoIncIDAllocator _auto_inc_id_allocator; };