diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index bb069a17c8..65867b5ee4 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -753,7 +753,14 @@ Status AnalyticEvalNode::get_next_output_batch(RuntimeState* state, RowBatch* ou // CopyRow works as expected: input_batch tuples form a prefix of output_batch // tuples. TupleRow* dest = output_batch->get_row(output_batch->add_row()); - input_batch.copy_row(input_batch.get_row(i), dest); + // input_batch is from a tuple_buffer_stream, + // It can only guarantee that the life cycle is valid in a batch stage. + // If the ancestor node is a no-spilling blocking node (such as hash_join_node except_node ...) + // these node may acquire a invalid tuple pointer, + // so we should use deep_copy, and copy tuple to the tuple_pool, to ensure tuple not finalized. + // reference issue #5466 + input_batch.get_row(i)->deep_copy(dest, child(0)->row_desc().tuple_descriptors(), + output_batch->tuple_data_pool(), false); dest->set_tuple(num_child_tuples, _result_tuples.front().second); if (ExecNode::eval_conjuncts(ctxs, num_ctxs, dest)) { diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index 1eb185c13c..76002151fb 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "common/status.h" #include "exec/file_reader.h" @@ -32,6 +33,7 @@ class BufferedReader : public FileReader { public: // If the reader need the file size, set it when construct FileReader. // There is no other way to set the file size. + // buffered_reader will acquire reader BufferedReader(FileReader* reader, int64_t = 1024 * 1024); virtual ~BufferedReader(); @@ -53,7 +55,7 @@ private: Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out); private: - FileReader* _reader; + std::unique_ptr _reader; char* _buffer; int64_t _buffer_size; int64_t _buffer_offset; diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h index 3f56fed128..d4a97eadd1 100644 --- a/be/src/exec/olap_utils.h +++ b/be/src/exec/olap_utils.h @@ -207,6 +207,9 @@ inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) { case TExprOpcode::NE: return opposite ? FILTER_IN : FILTER_NOT_IN; + case TExprOpcode::EQ_FOR_NULL: + return FILTER_IN; + default: VLOG_CRITICAL << "TExprOpcode: " << type; DCHECK(false); diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index bb74cdf26c..7e5d5f3eab 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -345,9 +345,20 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { } Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - int first_row_idx = row_batch->num_rows(); - RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos)); - RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx)); + // PartitionedAggregationNode is a spill node, GetNextInternal will read tuple from a tuple stream + // then copy the pointer to a RowBatch, it can only guarantee that the life cycle is valid in a batch stage. + // If the ancestor node is a no-spilling blocking node (such as hash_join_node except_node ...) + // these node may acquire a invalid tuple pointer, + // so we should use deep_copy, and copy tuple to the tuple_pool, to ensure tuple not finalized. + // reference issue #5466 + // TODO: if ancestor node don't have a no-spilling blocking node, we could avoid a deep_copy + // we should a flag indicate this node don't have to deep_copy + DCHECK_EQ(row_batch->num_rows(), 0); + RowBatch batch(row_batch->row_desc(), row_batch->capacity(), _mem_tracker.get()); + int first_row_idx = batch.num_rows(); + RETURN_IF_ERROR(GetNextInternal(state, &batch, eos)); + RETURN_IF_ERROR(HandleOutputStrings(&batch, first_row_idx)); + batch.deep_copy_to(row_batch); return Status::OK(); } diff --git a/be/src/exprs/literal.cpp b/be/src/exprs/literal.cpp index 89269ef0b5..11b458f23f 100644 --- a/be/src/exprs/literal.cpp +++ b/be/src/exprs/literal.cpp @@ -141,7 +141,7 @@ FloatVal Literal::get_float_val(ExprContext* context, TupleRow* row) { } DoubleVal Literal::get_double_val(ExprContext* context, TupleRow* row) { - DCHECK_EQ(_type.type, TYPE_DOUBLE) << _type; + DCHECK(_type.type == TYPE_DOUBLE || _type.type == TYPE_TIME) << _type; return DoubleVal(_value.double_val); } diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index ea234a0092..89680eedbe 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -483,8 +483,6 @@ bool MiniLoadAction::_is_streaming(HttpRequest* req) { LOG(INFO) << ss.str(); return false; } - MiniLoadCtx* mini_load_ctx = new MiniLoadCtx(true); - req->set_handler_ctx(mini_load_ctx); return true; } diff --git a/be/src/http/action/pprof_actions.cpp b/be/src/http/action/pprof_actions.cpp index a05bf22715..099d0848e7 100644 --- a/be/src/http/action/pprof_actions.cpp +++ b/be/src/http/action/pprof_actions.cpp @@ -28,6 +28,7 @@ #include "agent/utils.h" #include "common/config.h" +#include "common/object_pool.h" #include "gutil/strings/substitute.h" #include "http/ev_http_server.h" #include "http/http_channel.h" @@ -281,18 +282,18 @@ void SymbolAction::handle(HttpRequest* req) { } } -Status PprofActions::setup(ExecEnv* exec_env, EvHttpServer* http_server) { +Status PprofActions::setup(ExecEnv* exec_env, EvHttpServer* http_server, ObjectPool& pool) { if (!config::pprof_profile_dir.empty()) { FileUtils::create_dir(config::pprof_profile_dir); } - http_server->register_handler(HttpMethod::GET, "/pprof/heap", new HeapAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/growth", new GrowthAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/profile", new ProfileAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/pmuprofile", new PmuProfileAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/contention", new ContentionAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/cmdline", new CmdlineAction()); - auto action = new SymbolAction(exec_env->bfd_parser()); + http_server->register_handler(HttpMethod::GET, "/pprof/heap", pool.add(new HeapAction())); + http_server->register_handler(HttpMethod::GET, "/pprof/growth", pool.add(new GrowthAction())); + http_server->register_handler(HttpMethod::GET, "/pprof/profile", pool.add(new ProfileAction())); + http_server->register_handler(HttpMethod::GET, "/pprof/pmuprofile", pool.add(new PmuProfileAction())); + http_server->register_handler(HttpMethod::GET, "/pprof/contention", pool.add(new ContentionAction())); + http_server->register_handler(HttpMethod::GET, "/pprof/cmdline", pool.add(new CmdlineAction())); + auto action = pool.add(new SymbolAction(exec_env->bfd_parser())); http_server->register_handler(HttpMethod::GET, "/pprof/symbol", action); http_server->register_handler(HttpMethod::HEAD, "/pprof/symbol", action); http_server->register_handler(HttpMethod::POST, "/pprof/symbol", action); diff --git a/be/src/http/action/pprof_actions.h b/be/src/http/action/pprof_actions.h index d52ff6cb10..cbe89fe566 100644 --- a/be/src/http/action/pprof_actions.h +++ b/be/src/http/action/pprof_actions.h @@ -24,10 +24,11 @@ namespace doris { class EvHttpServer; class ExecEnv; +class ObjectPool; class PprofActions { public: - static Status setup(ExecEnv* exec_env, EvHttpServer* http_server); + static Status setup(ExecEnv* exec_env, EvHttpServer* http_server, ObjectPool& pool); }; } // namespace doris diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp index e9582462bb..d5bd907b51 100644 --- a/be/src/http/ev_http_server.cpp +++ b/be/src/http/ev_http_server.cpp @@ -128,8 +128,8 @@ void EvHttpServer::stop() { { std::lock_guard lock(_event_bases_lock); for (int i = 0; i < _num_workers; ++i) { - LOG(WARNING) << "event_base_loopexit ret: " - << event_base_loopexit(_event_bases[i].get(), nullptr); + LOG(WARNING) << "event_base_loopbreak ret: " + << event_base_loopbreak(_event_bases[i].get()); } _event_bases.clear(); } diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h index c3ddf3ade2..5f1c17a874 100644 --- a/be/src/olap/aggregate_func.h +++ b/be/src/olap/aggregate_func.h @@ -461,10 +461,9 @@ struct AggregateFuncTraitssize = 0; auto* hll = new HyperLogLog(*src_slice); + dst_slice->data = reinterpret_cast(hll); - mem_pool->mem_tracker()->Consume(sizeof(HyperLogLog)); - agg_pool->add(hll); } @@ -511,8 +510,6 @@ struct AggregateFuncTraitsdata = (char*)bitmap; - mem_pool->mem_tracker()->Consume(sizeof(BitmapValue)); - agg_pool->add(bitmap); } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 7567ae6e55..20e459b257 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -37,6 +37,7 @@ #include "runtime/exec_env.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" +#include "util/defer_op.h" using std::deque; using std::list; @@ -56,7 +57,8 @@ public: bool sort(RowBlock** row_block); private: - static bool _row_cursor_comparator(const RowCursor* a, const RowCursor* b) { + static bool _row_cursor_comparator(const std::unique_ptr& a, + const std::unique_ptr& b) { return compare_row(*a, *b) < 0; } @@ -447,7 +449,8 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t // filter data according to delete conditions specified in DeleteData command if (is_data_left_vec[row_index] == 1) { - if (_delete_handler != nullptr && _delete_handler->is_filter_data(data_version, read_helper)) { + if (_delete_handler != nullptr && + _delete_handler->is_filter_data(data_version, read_helper)) { is_data_left_vec[row_index] = 0; (*filtered_rows)++; } @@ -480,8 +483,8 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t << _schema_mapping[i].materialized_function; return OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID; } - VLOG_NOTICE << "_schema_mapping[" << i - << "].materialized_function : " << _schema_mapping[i].materialized_function; + VLOG_NOTICE << "_schema_mapping[" << i << "].materialized_function : " + << _schema_mapping[i].materialized_function; for (size_t row_index = 0, new_row_index = 0; row_index < ref_block->row_block_info().row_num; ++row_index) { // No need row, need to be filter @@ -597,10 +600,11 @@ OLAPStatus RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t if (newtype < reftype) { VLOG_NOTICE << "type degraded while altering column. " - << "column=" << mutable_block->tablet_schema().column(i).name() - << ", origin_type=" - << ref_block->tablet_schema().column(ref_column).type() - << ", alter_type=" << mutable_block->tablet_schema().column(i).type(); + << "column=" << mutable_block->tablet_schema().column(i).name() + << ", origin_type=" + << ref_block->tablet_schema().column(ref_column).type() + << ", alter_type=" + << mutable_block->tablet_schema().column(i).type(); } } } else { @@ -671,21 +675,21 @@ bool RowBlockSorter::sort(RowBlock** row_block) { return false; } - RowBlock* temp = nullptr; - std::vector row_cursor_list((*row_block)->row_block_info().row_num, nullptr); - + std::vector> row_cursor_list; + row_cursor_list.reserve((*row_block)->row_block_info().row_num); // create an list of row cursor as long as the number of rows in data block. for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) { - if ((row_cursor_list[i] = new (nothrow) RowCursor()) == nullptr) { + row_cursor_list.emplace_back(new (nothrow) RowCursor()); + if (row_cursor_list[i] == nullptr) { LOG(WARNING) << "failed to malloc RowCursor. size=" << sizeof(RowCursor); - goto SORT_ERR_EXIT; + return false; } if (row_cursor_list[i]->init((*row_block)->tablet_schema()) != OLAP_SUCCESS) { - goto SORT_ERR_EXIT; + return false; } - (*row_block)->get_row(i, row_cursor_list[i]); + (*row_block)->get_row(i, row_cursor_list[i].get()); } // Must use 'std::' because this class has a function whose name is sort too @@ -700,23 +704,10 @@ bool RowBlockSorter::sort(RowBlock** row_block) { _swap_row_block->finalize(row_cursor_list.size()); - for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) { - SAFE_DELETE(row_cursor_list[i]); - } - // swap the row block for reducing memory allocating. - temp = *row_block; - *row_block = _swap_row_block; - _swap_row_block = temp; + std::swap(*row_block, _swap_row_block); return true; - -SORT_ERR_EXIT: - for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) { - SAFE_DELETE(row_cursor_list[i]); - } - - return false; } RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation) @@ -738,7 +729,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo if (_memory_limitation > 0 && _memory_allocated + row_block_size > _memory_limitation) { VLOG_NOTICE << "RowBlockAllocator::alocate() memory exceeded. " - << "m_memory_allocated=" << _memory_allocated; + << "m_memory_allocated=" << _memory_allocated; *row_block = nullptr; return OLAP_SUCCESS; } @@ -757,7 +748,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bo _memory_allocated += row_block_size; VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows - << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << *row_block; + << ", m_memory_allocated=" << _memory_allocated + << ", row_block_addr=" << *row_block; return OLAP_SUCCESS; } @@ -770,8 +762,8 @@ void RowBlockAllocator::release(RowBlock* row_block) { _memory_allocated -= row_block->capacity() * _row_len; VLOG_NOTICE << "RowBlockAllocator::release() this=" << this - << ", num_rows=" << row_block->capacity() - << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << row_block; + << ", num_rows=" << row_block->capacity() + << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << row_block; delete row_block; } @@ -925,6 +917,24 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBloc return true; } +OLAPStatus reserve_block(std::unique_ptr* block_handle_ptr, int row_num, + RowBlockAllocator* allocator) { + auto& block_handle = *block_handle_ptr; + if (block_handle == nullptr || block_handle->capacity() < row_num) { + // release old block and alloc new block + if (block_handle != nullptr) { + block_handle.reset(); + } + RowBlock* new_row_block = nullptr; + auto res = allocator->allocate(&new_row_block, row_num, true); + RETURN_NOT_OK_LOG(res, "failed to allocate RowBlock."); + block_handle.reset(new_row_block); + } else { + block_handle->clear(); + } + return OLAP_SUCCESS; +} + OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) { @@ -973,8 +983,13 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, } VLOG_NOTICE << "init writer. new_tablet=" << new_tablet->full_name() - << ", block_row_number=" << new_tablet->num_rows_per_row_block(); - RowBlock* new_row_block = nullptr; + << ", block_row_number=" << new_tablet->num_rows_per_row_block(); + + std::unique_ptr new_row_block(nullptr, [&](RowBlock* block) { + if (block != nullptr) { + _row_block_allocator->release(block); + } + }); // Reset filtered_rows and merged_rows statistic reset_merged_rows(); @@ -983,38 +998,24 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); while (ref_row_block != nullptr && ref_row_block->has_remaining()) { - // 注意这里强制分配和旧块等大的块(小了可能会存不下) - if (new_row_block == nullptr || - new_row_block->capacity() < ref_row_block->row_block_info().row_num) { - if (new_row_block != nullptr) { - _row_block_allocator->release(new_row_block); - new_row_block = nullptr; - } - res = _row_block_allocator->allocate(&new_row_block, - ref_row_block->row_block_info().row_num, true); - if (OLAP_SUCCESS != res) { - LOG(WARNING) << "failed to allocate RowBlock."; - goto DIRECTLY_PROCESS_ERR; - } - } else { - new_row_block->clear(); - } + // We will allocate blocks of the same size as before + // to ensure that the data can be stored + RETURN_NOT_OK(reserve_block(&new_row_block, ref_row_block->row_block_info().row_num, + _row_block_allocator)); // 将ref改为new。这一步按道理来说确实需要等大的块,但理论上和writer无关。 uint64_t filtered_rows = 0; res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second, - new_row_block, &filtered_rows); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to change data in row block."; - goto DIRECTLY_PROCESS_ERR; - } + new_row_block.get(), &filtered_rows); + RETURN_NOT_OK_LOG(res, "failed to change data in row block."); + // rows filtered by delete handler one by one add_filtered_rows(filtered_rows); - if (!_write_row_block(rowset_writer, new_row_block)) { - LOG(WARNING) << "failed to write row block."; + if (!_write_row_block(rowset_writer, new_row_block.get())) { res = OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID; - goto DIRECTLY_PROCESS_ERR; + LOG(WARNING) << "failed to write row block."; + return res; } ref_row_block->clear(); @@ -1022,8 +1023,7 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, } if (OLAP_SUCCESS != rowset_writer->flush()) { - res = OLAP_ERR_ALTER_STATUS_ERR; - goto DIRECTLY_PROCESS_ERR; + return OLAP_ERR_ALTER_STATUS_ERR; } // rows filtered by zone map against delete handler @@ -1048,12 +1048,6 @@ OLAPStatus SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << rowset_writer->num_rows(); } - -DIRECTLY_PROCESS_ERR: - if (new_row_block) { - _row_block_allocator->release(new_row_block); - new_row_block = nullptr; - } return res; } @@ -1117,6 +1111,19 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, // src_rowsets to store the rowset generated by internal sorting std::vector src_rowsets; + DeferOp defer([&]() { + // remove the intermediate rowsets generated by internal sorting + for (auto& row_set : src_rowsets) { + StorageEngine::instance()->add_unused_rowset(row_set); + } + + for (auto block : row_block_arr) { + _row_block_allocator->release(block); + } + + row_block_arr.clear(); + }); + _temp_delta_versions.first = _temp_delta_versions.second; // Reset filtered_rows and merged_rows statistic @@ -1136,8 +1143,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, ref_row_block->row_block_info().row_num, true)) { LOG(WARNING) << "failed to allocate RowBlock."; - res = OLAP_ERR_INPUT_PARAMETER_ERROR; - goto SORTING_PROCESS_ERR; + return OLAP_ERR_INPUT_PARAMETER_ERROR; } if (new_row_block == nullptr) { @@ -1159,15 +1165,13 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, rowset_reader->version_hash(), new_tablet, new_rowset_type, segments_overlap, &rowset)) { LOG(WARNING) << "failed to sorting internally."; - res = OLAP_ERR_ALTER_STATUS_ERR; - goto SORTING_PROCESS_ERR; + return OLAP_ERR_ALTER_STATUS_ERR; } src_rowsets.push_back(rowset); - for (vector::iterator it = row_block_arr.begin(); it != row_block_arr.end(); - ++it) { - _row_block_allocator->release(*it); + for (auto block : row_block_arr) { + _row_block_allocator->release(block); } row_block_arr.clear(); @@ -1181,18 +1185,18 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second, new_row_block, &filtered_rows); if (res != OLAP_SUCCESS) { + row_block_arr.push_back(new_row_block); LOG(WARNING) << "failed to change data in row block."; - goto SORTING_PROCESS_ERR; + return res; } add_filtered_rows(filtered_rows); if (new_row_block->row_block_info().row_num > 0) { if (!row_block_sorter.sort(&new_row_block)) { + row_block_arr.push_back(new_row_block); LOG(WARNING) << "failed to sort row block."; - res = OLAP_ERR_ALTER_STATUS_ERR; - OLAP_GOTO(SORTING_PROCESS_ERR); + return OLAP_ERR_ALTER_STATUS_ERR; } - row_block_arr.push_back(new_row_block); } else { LOG(INFO) << "new block num rows is: " << new_row_block->row_block_info().row_num; @@ -1217,15 +1221,13 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, rowset_reader->version_hash(), new_tablet, new_rowset_type, segments_overlap, &rowset)) { LOG(WARNING) << "failed to sorting internally."; - res = OLAP_ERR_ALTER_STATUS_ERR; - goto SORTING_PROCESS_ERR; + return OLAP_ERR_ALTER_STATUS_ERR; } src_rowsets.push_back(rowset); - for (vector::iterator it = row_block_arr.begin(); it != row_block_arr.end(); - ++it) { - _row_block_allocator->release(*it); + for (auto block : row_block_arr) { + _row_block_allocator->release(block); } row_block_arr.clear(); @@ -1244,8 +1246,7 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, } } else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) { LOG(WARNING) << "failed to sorting externally."; - res = OLAP_ERR_ALTER_STATUS_ERR; - goto SORTING_PROCESS_ERR; + return OLAP_ERR_ALTER_STATUS_ERR; } add_filtered_rows(rowset_reader->filtered_rows()); @@ -1269,20 +1270,6 @@ OLAPStatus SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << new_rowset_writer->num_rows(); } - -SORTING_PROCESS_ERR: - - // remove the intermediate rowsets generated by internal sorting - for (vector::iterator it = src_rowsets.begin(); it != src_rowsets.end(); - ++it) { - StorageEngine::instance()->add_unused_rowset(*it); - } - - for (vector::iterator it = row_block_arr.begin(); it != row_block_arr.end(); ++it) { - _row_block_allocator->release(*it); - } - - row_block_arr.clear(); return res; } @@ -1309,7 +1296,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector& ro context.version_hash = version_hash; context.segments_overlap = segments_overlap; VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name() - << ", block_row_size=" << new_tablet->num_rows_per_row_block(); + << ", block_row_size=" << new_tablet->num_rows_per_row_block(); std::unique_ptr rowset_writer; if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != OLAP_SUCCESS) { @@ -1864,7 +1851,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // c. 转换历史数据 for (auto& rs_reader : sc_params.ref_rowset_readers) { VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first - << "-" << rs_reader->version().second; + << "-" << rs_reader->version().second; // set status for monitor // 只要有一个new_table为running,ref table就设置为running @@ -1935,13 +1922,14 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa goto PROCESS_ALTER_EXIT; } else { VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name() - << ", version=" << rs_reader->version().first << "-" - << rs_reader->version().second; + << ", version=" << rs_reader->version().first << "-" + << rs_reader->version().second; } sc_params.new_tablet->release_push_lock(); VLOG_TRACE << "succeed to convert a history version." - << " version=" << rs_reader->version().first << "-" << rs_reader->version().second; + << " version=" << rs_reader->version().first << "-" + << rs_reader->version().second; } // XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本 PROCESS_ALTER_EXIT : { @@ -1989,7 +1977,7 @@ OLAPStatus SchemaChangeHandler::_parse_request( column_mapping->ref_column = column_index; VLOG_NOTICE << "A column refered to existed column will be added after schema changing." - << "column=" << column_name << ", ref_column=" << column_index; + << "column=" << column_name << ", ref_column=" << column_index; continue; } @@ -2031,8 +2019,8 @@ OLAPStatus SchemaChangeHandler::_parse_request( } VLOG_TRACE << "A column with default value will be added after schema changing. " - << "column=" << column_name - << ", default_value=" << new_column.default_value(); + << "column=" << column_name + << ", default_value=" << new_column.default_value(); continue; } @@ -2044,8 +2032,8 @@ OLAPStatus SchemaChangeHandler::_parse_request( } VLOG_NOTICE << "A new schema delta is converted while dropping column. " - << "Dropped column will be assigned as '0' for the older schema. " - << "column=" << column_name; + << "Dropped column will be assigned as '0' for the older schema. " + << "column=" << column_name; } // Check if re-aggregation is needed. diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 0959470fa9..9566423ebc 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -19,6 +19,7 @@ #define DORIS_BE_SRC_OLAP_SCHEMA_CHANGE_H #include +#include #include #include @@ -245,6 +246,7 @@ private: DISALLOW_COPY_AND_ASSIGN(SchemaChangeHandler); }; +using RowBlockDeleter = std::function; } // namespace doris #endif // DORIS_BE_SRC_OLAP_SCHEMA_CHANGE_H diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index c1c3a5b023..461a389629 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -604,6 +604,19 @@ void RowBatch::acquire_state(RowBatch* src) { src->transfer_resource_ownership(this); } +void RowBatch::deep_copy_to(RowBatch* dst) { + DCHECK(dst->_row_desc.equals(_row_desc)); + DCHECK_EQ(dst->_num_rows, 0); + DCHECK_GE(dst->_capacity, _num_rows); + dst->add_rows(_num_rows); + for (int i = 0; i < _num_rows; ++i) { + TupleRow* src_row = get_row(i); + TupleRow* dst_row = reinterpret_cast(dst->_tuple_ptrs + i * _num_tuples_per_row); + src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), dst->_tuple_data_pool.get(), + false); + } + dst->commit_rows(_num_rows); +} // TODO: consider computing size of batches as they are built up int RowBatch::total_byte_size() { int result = 0; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index ce7e36a736..5e647bb5c8 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -260,7 +260,6 @@ Status TabletsChannel::cancel() { for (auto& it : _tablet_writers) { it.second->cancel(); } - DCHECK_EQ(_mem_tracker->consumption(), 0); _state = kFinished; return Status::OK(); } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 2685c1e9c3..78d9dc9975 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -55,95 +55,96 @@ Status HttpService::start() { add_default_path_handlers(_web_page_handler.get(), _env->process_mem_tracker()); // register load - _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", - new MiniLoadAction(_env)); + MiniLoadAction* miniload_action = _pool.add(new MiniLoadAction(_env)); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", miniload_action); + StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env)); _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load", - new StreamLoadAction(_env)); + streamload_action); // register download action std::vector allow_paths; for (auto& path : _env->store_paths()) { allow_paths.emplace_back(path.path); } - DownloadAction* download_action = new DownloadAction(_env, allow_paths); + DownloadAction* download_action = _pool.add(new DownloadAction(_env, allow_paths)); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); - DownloadAction* tablet_download_action = new DownloadAction(_env, allow_paths); + DownloadAction* tablet_download_action = _pool.add(new DownloadAction(_env, allow_paths)); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_download", tablet_download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download", tablet_download_action); DownloadAction* error_log_download_action = - new DownloadAction(_env, _env->load_path_mgr()->get_load_error_file_dir()); + _pool.add(new DownloadAction(_env, _env->load_path_mgr()->get_load_error_file_dir())); _ev_http_server->register_handler(HttpMethod::GET, "/api/_load_error_log", error_log_download_action); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); // Register BE health action - HealthAction* health_action = new HealthAction(_env); + HealthAction* health_action = _pool.add(new HealthAction(_env)); _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); // Register Tablets Info action - TabletsInfoAction* tablets_info_action = new TabletsInfoAction(); + TabletsInfoAction* tablets_info_action = _pool.add(new TabletsInfoAction()); _ev_http_server->register_handler(HttpMethod::GET, "/tablets_json", tablets_info_action); // Register Tablets Distribution action - TabletsDistributionAction* tablets_distribution_action = new TabletsDistributionAction(); + TabletsDistributionAction* tablets_distribution_action = _pool.add(new TabletsDistributionAction()); _ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution", tablets_distribution_action); // Register tablet migration action - TabletMigrationAction* tablet_migration_action = new TabletMigrationAction(); + TabletMigrationAction* tablet_migration_action = _pool.add(new TabletMigrationAction()); _ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration", tablet_migration_action); // register pprof actions - PprofActions::setup(_env, _ev_http_server.get()); + PprofActions::setup(_env, _ev_http_server.get(), _pool); // register metrics { - auto action = new MetricsAction(DorisMetrics::instance()->metric_registry()); + auto action = _pool.add(new MetricsAction(DorisMetrics::instance()->metric_registry())); _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); } - MetaAction* meta_action = new MetaAction(HEADER); + MetaAction* meta_action = _pool.add(new MetaAction(HEADER)); _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}", meta_action); #ifndef BE_TEST // Register BE checksum action - ChecksumAction* checksum_action = new ChecksumAction(_env); + ChecksumAction* checksum_action = _pool.add(new ChecksumAction(_env)); _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); // Register BE reload tablet action - ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(_env); + ReloadTabletAction* reload_tablet_action = _pool.add(new ReloadTabletAction(_env)); _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); - RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(_env); + RestoreTabletAction* restore_tablet_action = _pool.add(new RestoreTabletAction(_env)); _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet", restore_tablet_action); // Register BE snapshot action - SnapshotAction* snapshot_action = new SnapshotAction(_env); + SnapshotAction* snapshot_action = _pool.add(new SnapshotAction(_env)); _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); #endif // 2 compaction actions CompactionAction* show_compaction_action = - new CompactionAction(CompactionActionType::SHOW_INFO); + _pool.add(new CompactionAction(CompactionActionType::SHOW_INFO)); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/show", show_compaction_action); CompactionAction* run_compaction_action = - new CompactionAction(CompactionActionType::RUN_COMPACTION); + _pool.add(new CompactionAction(CompactionActionType::RUN_COMPACTION)); _ev_http_server->register_handler(HttpMethod::POST, "/api/compaction/run", run_compaction_action); CompactionAction* run_status_compaction_action = - new CompactionAction(CompactionActionType::RUN_COMPACTION_STATUS); + _pool.add(new CompactionAction(CompactionActionType::RUN_COMPACTION_STATUS)); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); - UpdateConfigAction* update_config_action = new UpdateConfigAction(); + UpdateConfigAction* update_config_action = _pool.add(new UpdateConfigAction()); _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action); _ev_http_server->start(); @@ -152,6 +153,7 @@ Status HttpService::start() { void HttpService::stop() { _ev_http_server->stop(); + _pool.clear(); } } // namespace doris diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h index d2bf39b333..d319ea2b48 100644 --- a/be/src/service/http_service.h +++ b/be/src/service/http_service.h @@ -20,6 +20,7 @@ #include #include "common/status.h" +#include "common/object_pool.h" namespace doris { @@ -38,6 +39,7 @@ public: private: ExecEnv* _env; + ObjectPool _pool; std::unique_ptr _ev_http_server; std::unique_ptr _web_page_handler;