diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index eece72cf5a..25360b3dda 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -764,7 +764,7 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po _opts.enable_unique_key_merge_on_write); bool need_short_key_indexes = !need_primary_key_indexes || - (need_primary_key_indexes && _tablet_schema->cluster_key_idxes().size() > 0); + (need_primary_key_indexes && !_tablet_schema->cluster_key_idxes().empty()); if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false)); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 925b77ba51..87b1de3170 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -101,7 +101,7 @@ private: class SchemaChange { public: - SchemaChange() : _filtered_rows(0), _merged_rows(0) {} + SchemaChange() = default; virtual ~SchemaChange() = default; virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, @@ -118,7 +118,6 @@ public: RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet_schema, new_tablet_schema)); - _add_filtered_rows(rowset_reader->filtered_rows()); // Check row num changes if (!_check_row_nums(rowset_reader, *rowset_writer)) { @@ -147,21 +146,23 @@ protected: } virtual bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const { - if (reader->rowset()->num_rows() != writer.num_rows() + _merged_rows + _filtered_rows) { + if (reader->rowset()->num_rows() - reader->filtered_rows() != + writer.num_rows() + writer.num_rows_filtered() + _merged_rows + _filtered_rows) { LOG(WARNING) << "fail to check row num! " << "source_rows=" << reader->rowset()->num_rows() - << ", writer rows=" << writer.num_rows() + << ", source_filtered_rows=" << reader->filtered_rows() + << ", written_rows=" << writer.num_rows() + << ", writer_filtered_rows=" << writer.num_rows_filtered() << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() - << ", new_index_rows=" << writer.num_rows(); + << ", filtered_rows=" << filtered_rows(); return false; } return true; } private: - uint64_t _filtered_rows; - uint64_t _merged_rows; + uint64_t _filtered_rows {}; + uint64_t _merged_rows {}; }; class LinkedSchemaChange : public SchemaChange { diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index cf8a994b1b..d46c692232 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -59,13 +59,16 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - auto& mem_used = local_state._shared_state->mem_used; auto& build_block = local_state._shared_state->build_block; auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl; if (in_block->rows() != 0) { - mem_used += in_block->allocated_bytes(); RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block)); + + if (local_state._mutable_block.rows() > std::numeric_limits::max()) { + return Status::NotSupported("set operator do not support build table rows over:" + + std::to_string(std::numeric_limits::max())); + } } if (source_state == SourceState::FINISHED || diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index e44d59d7c6..4319ec8f43 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -86,8 +86,6 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - int64_t* mem_used() { return &_shared_state->mem_used; }; - private: friend class SetSinkOperatorX; template diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index c61611d10f..2e917cd4f5 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -489,8 +489,6 @@ struct SetSharedState : public BasicSharedState { public: SetSharedState(int num_deps) { probe_finished_children_dependency.resize(num_deps, nullptr); } /// default init - //record memory during running - int64_t mem_used = 0; vectorized::Block build_block; // build to source //record element size in hashtable int64_t valid_element_in_hash_tbl = 0; diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index 152b20eeef..04cc764525 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -25,21 +25,11 @@ template struct HashTableBuild { template HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, RuntimeState* state) - : _mem_used(parent->mem_used()), - _rows(rows), - _build_raw_ptrs(build_raw_ptrs), - _state(state) {} + : _rows(rows), _build_raw_ptrs(build_raw_ptrs), _state(state) {} Status operator()(HashTableContext& hash_table_ctx, Arena& arena) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; - int64_t old_bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - - Defer defer {[&]() { - int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - *_mem_used += bucket_bytes - old_bucket_bytes; - }}; - KeyGetter key_getter(_build_raw_ptrs); hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows); @@ -60,7 +50,6 @@ struct HashTableBuild { } private: - int64_t* _mem_used = nullptr; const int _rows; ColumnRawPtrs& _build_raw_ptrs; RuntimeState* _state = nullptr; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 28dfd23ec7..3e41a067b6 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -55,10 +55,7 @@ namespace vectorized { template VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), - _valid_element_in_hash_tbl(0), - _mem_used(0), - _build_finished(false) { + : ExecNode(pool, tnode, descs), _valid_element_in_hash_tbl(0), _build_finished(false) { _hash_table_variants = std::make_unique(); } @@ -229,20 +226,10 @@ Status VSetOperationNode::sink(RuntimeState* state, Block* block, SCOPED_TIMER(_exec_timer); if (block->rows() != 0) { - _mem_used += block->allocated_bytes(); - RETURN_IF_ERROR(_mutable_block.merge(*block)); - } - - if (block->rows() != 0) { - if (_build_block.empty()) { - RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(0, false)))); - } RETURN_IF_ERROR(_mutable_block.merge(*block)); if (_mutable_block.rows() > std::numeric_limits::max()) { - return Status::NotSupported( - "Hash join do not support build table rows" - " over:" + - std::to_string(std::numeric_limits::max())); + return Status::NotSupported("set operator do not support build table rows over:" + + std::to_string(std::numeric_limits::max())); } } diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 070ad381f4..3741fc564c 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -74,7 +74,6 @@ public: bool is_child_finished(int child_id) const; int64_t* valid_element_in_hash_tbl() { return &_valid_element_in_hash_tbl; } - int64_t* mem_used() { return &_mem_used; }; private: void _finalize_probe(int child_id); @@ -110,8 +109,6 @@ private: //first:column_id, could point to origin column or cast column //second:idx mapped to column types std::unordered_map _build_col_idx; - //record memory during running - int64_t _mem_used; //record insert column id during probe std::vector _probe_column_inserted_id;