diff --git a/be/src/runtime/data_spliter.cpp b/be/src/runtime/data_spliter.cpp index ae889e6b4b..20f4137446 100644 --- a/be/src/runtime/data_spliter.cpp +++ b/be/src/runtime/data_spliter.cpp @@ -288,9 +288,13 @@ Status DataSpliter::close(RuntimeState* state, Status close_status) { is_ok = false; err_status = status; } - iter.second->reset(); + iter.second->clear(); } } + } else { + for (const auto& iter : _batch_map) { + iter.second->clear(); + } } // finish sink for (const auto& iter : _dpp_sink_vec) { diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 616581e50c..c2f9c9e9f2 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -166,7 +166,11 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, } } -RowBatch::~RowBatch() { +void RowBatch::clear() { + if (_cleared) { + return; + } + _tuple_data_pool->free_all(); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); @@ -181,6 +185,11 @@ RowBatch::~RowBatch() { _mem_tracker->release(_tuple_ptrs_size); _tuple_ptrs = NULL; } + _cleared = true; +} + +RowBatch::~RowBatch() { + clear(); } int RowBatch::serialize(TRowBatch* output_batch) { diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index fdf650604e..3ed9cea8a5 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -99,6 +99,9 @@ public: // - buffer handles from the io mgr virtual ~RowBatch(); + // used to c + void clear(); + static const int INVALID_ROW_INDEX = -1; // Add n rows of tuple pointers after the last committed row and return its index. @@ -486,6 +489,7 @@ private: std::string _compression_scratch; int _scanner_id; + bool _cleared = false; }; /// Macros for iterating through '_row_batch', starting at '_start_row_idx'.