diff --git a/be/src/common/config.h b/be/src/common/config.h index 06bae98e2d..86bfd1981e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -178,6 +178,7 @@ CONF_Int32(doris_scanner_thread_pool_thread_num, "48"); CONF_Int32(doris_scanner_thread_pool_queue_size, "102400"); // default thrift client connect timeout(in seconds) CONF_mInt32(thrift_connect_timeout_seconds, "3"); +CONF_mInt32(fetch_rpc_timeout_seconds, "20"); // default thrift client retry interval (in milliseconds) CONF_mInt64(thrift_client_retry_interval_ms, "1000"); // max row count number for single scan range, used in segmentv1 diff --git a/be/src/common/consts.h b/be/src/common/consts.h index 4e2045a1ce..3966a88346 100644 --- a/be/src/common/consts.h +++ b/be/src/common/consts.h @@ -25,6 +25,7 @@ const std::string CSV = "csv"; const std::string CSV_WITH_NAMES = "csv_with_names"; const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__"; +const std::string ROWID_COL = "__DORIS_ROWID_COL__"; constexpr int MAX_DECIMAL32_PRECISION = 9; constexpr int MAX_DECIMAL64_PRECISION = 18; diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 678b865b1c..a36857d414 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -63,6 +63,7 @@ set(EXEC_FILES odbc_connector.cpp table_connector.cpp schema_scanner.cpp + rowid_fetcher.cpp ) if (WITH_LZO) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp new file mode 100644 index 0000000000..a9a326c0e3 --- /dev/null +++ b/be/src/exec/rowid_fetcher.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/rowid_fetcher.h" + +#include "bthread/countdown_event.h" +#include "exec/tablet_info.h" // DorisNodesInfo +#include "gen_cpp/Types_types.h" +#include "gen_cpp/internal_service.pb.h" +#include "runtime/exec_env.h" // ExecEnv +#include "runtime/runtime_state.h" // RuntimeState +#include "util/brpc_client_cache.h" // BrpcClientCache +#include "util/defer_op.h" +#include "vec/core/block.h" // Block + +namespace doris { + +Status RowIDFetcher::init(DorisNodesInfo* nodes_info) { + for (auto [node_id, node_info] : nodes_info->nodes_info()) { + auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + node_info.host, node_info.brpc_port); + if (!client) { + LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host + << ", port=" << node_info.brpc_port; + return Status::InternalError("RowIDFetcher failed to init rpc client"); + } + _stubs.push_back(client); + } + return Status::OK(); +} + +static std::string format_rowid(const GlobalRowLoacation& location) { + return fmt::format("{} {} {} {}", location.tablet_id, + location.row_location.rowset_id.to_string(), + location.row_location.segment_id, location.row_location.row_id); +} + +PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_ids) { + PMultiGetRequest mget_req; + _tuple_desc->to_protobuf(mget_req.mutable_desc()); + for (auto slot : _tuple_desc->slots()) { + slot->to_protobuf(mget_req.add_slots()); + } + for (size_t i = 0; i < row_ids.size(); ++i) { + PMultiGetRequest::RowId row_id; + StringRef row_id_rep = row_ids.get_data_at(i); + auto location = reinterpret_cast(row_id_rep.data); + row_id.set_tablet_id(location->tablet_id); + row_id.set_rowset_id(location->row_location.rowset_id.to_string()); + row_id.set_segment_id(location->row_location.segment_id); + row_id.set_ordinal_id(location->row_location.row_id); + *mget_req.add_rowids() = std::move(row_id); + } + mget_req.set_be_exec_version(_st->be_exec_version()); + return mget_req; +} + +static void fetch_callback(bthread::CountdownEvent* counter) { + Defer __defer([&] { counter->signal(); }); +} + +static Status MergeRPCResults(const std::vector& rsps, + const std::vector& cntls, + vectorized::MutableBlock* output_block) { + for (const auto& cntl : cntls) { + if (cntl.Failed()) { + LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText() + << ", host:" << cntl.remote_side(); + return Status::InternalError(cntl.ErrorText()); + } + } + for (const auto& resp : rsps) { + Status st(resp.status()); + if (!st.ok()) { + LOG(WARNING) << "Failed to fetch " << st.to_string(); + return st; + } + vectorized::Block partial_block(resp.block()); + output_block->merge(partial_block); + } + return Status::OK(); +} + +Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids, + vectorized::MutableBlock* res_block) { + CHECK(!_stubs.empty()); + res_block->clear_column_data(); + vectorized::MutableBlock mblock({_tuple_desc}, row_ids->size()); + PMultiGetRequest mget_req = _init_fetch_request(assert_cast( + *vectorized::remove_nullable(row_ids).get())); + std::vector resps(_stubs.size()); + std::vector cntls(_stubs.size()); + bthread::CountdownEvent counter(_stubs.size()); + for (size_t i = 0; i < _stubs.size(); ++i) { + cntls[i].set_timeout_ms(config::fetch_rpc_timeout_seconds * 1000); + auto callback = brpc::NewCallback(fetch_callback, &counter); + _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback); + } + counter.wait(); + RETURN_IF_ERROR(MergeRPCResults(resps, cntls, &mblock)); + // final sort by row_ids sequence, since row_ids is already sorted + vectorized::Block tmp = mblock.to_block(); + std::unordered_map row_order; + vectorized::ColumnPtr row_id_column = tmp.get_columns().back(); + for (size_t x = 0; x < row_id_column->size(); ++x) { + auto location = + reinterpret_cast(row_id_column->get_data_at(x).data); + row_order[format_rowid(*location)] = x; + } + for (size_t x = 0; x < row_ids->size(); ++x) { + auto location = reinterpret_cast(row_ids->get_data_at(x).data); + res_block->add_row(&tmp, row_order[format_rowid(*location)]); + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h new file mode 100644 index 0000000000..57101c5033 --- /dev/null +++ b/be/src/exec/rowid_fetcher.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "gen_cpp/internal_service.pb.h" +#include "vec/core/block.h" + +namespace doris { + +class DorisNodesInfo; +class RuntimeState; + +// fetch rows by global rowid +// tablet_id/rowset_name/segment_id/ordinal_id +class RowIDFetcher { +public: + RowIDFetcher(TupleDescriptor* desc, RuntimeState* st) : _tuple_desc(desc), _st(st) {} + Status init(DorisNodesInfo* nodes_info); + Status fetch(const vectorized::ColumnPtr& row_ids, vectorized::MutableBlock* block); + +private: + PMultiGetRequest _init_fetch_request(const vectorized::ColumnString& row_ids); + + std::vector> _stubs; + TupleDescriptor* _tuple_desc; + RuntimeState* _st; +}; + +} // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 3e955b9b6b..76d4fa8094 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -249,6 +249,8 @@ public: return nullptr; } + const std::unordered_map& nodes_info() { return _nodes; } + private: std::unordered_map _nodes; }; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index bc818982b0..9142ce0557 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -29,7 +29,7 @@ class IOBufAsZeroCopyInputStream; namespace doris { class Predicate; class ObjectPool; -class RuntimeState; +class ExprContext; class RuntimePredicateWrapper; class MemTracker; class TupleRow; diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index d564b2b238..db1580d06d 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -119,6 +119,8 @@ public: vectorized::VExpr* remaining_vconjunct_root = nullptr; // runtime state RuntimeState* runtime_state = nullptr; + RowsetId rowset_id; + int32_t tablet_id = 0; }; class RowwiseIterator { diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index c56345eacb..6612fff758 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -64,6 +64,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.stats = _stats; _read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt; _read_options.remaining_vconjunct_root = _context->remaining_vconjunct_root; + _read_options.rowset_id = _rowset->rowset_id(); + _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); if (read_context->lower_bound_keys != nullptr) { for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) { _read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i), diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 1cbcb085af..2d57c33430 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -417,6 +417,56 @@ private: vectorized::ColumnArray::ColumnOffsets& column_offsets); }; +class RowIdColumnIterator : public ColumnIterator { +public: + RowIdColumnIterator() = delete; + RowIdColumnIterator(int32_t tid, RowsetId rid, int32_t segid) + : _tablet_id(tid), _rowset_id(rid), _segment_id(segid) {} + + Status seek_to_first() override { + _current_rowid = 0; + return Status::OK(); + } + + Status seek_to_ordinal(ordinal_t ord_idx) override { + _current_rowid = ord_idx; + return Status::OK(); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { + bool has_null; + return next_batch(n, dst, &has_null); + } + + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { + for (size_t i = 0; i < *n; ++i) { + rowid_t row_id = _current_rowid + i; + GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id); + dst->insert_data(reinterpret_cast(&location), sizeof(GlobalRowLoacation)); + } + _current_rowid += *n; + return Status::OK(); + } + + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override { + for (size_t i = 0; i < count; ++i) { + rowid_t row_id = rowids[i]; + GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id); + dst->insert_data(reinterpret_cast(&location), sizeof(GlobalRowLoacation)); + } + return Status::OK(); + } + + ordinal_t get_current_ordinal() const override { return _current_rowid; } + +private: + rowid_t _current_rowid = 0; + int32_t _tablet_id = 0; + RowsetId _rowset_id; + int32_t _segment_id = 0; +}; + // This iterator is used to read default value column class DefaultValueColumnIterator : public ColumnIterator { public: diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 1ec4943cf2..47a1ebd176 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -112,6 +112,8 @@ public: return _footer.primary_key_index_meta().max_key(); }; + io::FileReaderSPtr file_reader() { return _file_reader; } + private: DISALLOW_COPY_AND_ASSIGN(Segment); Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 659b49b6fe..7d3bde6c5d 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -179,6 +179,9 @@ Status SegmentIterator::init(const StorageReadOptions& opts) { _remaining_vconjunct_root = opts.remaining_vconjunct_root; _column_predicate_info.reset(new ColumnPredicateInfo()); + if (_schema.rowid_col_idx() > 0) { + _opts.record_rowids = true; + } return Status::OK(); } @@ -688,6 +691,11 @@ Status SegmentIterator::_init_return_column_iterators() { } for (auto cid : _schema.column_ids()) { int32_t unique_id = _opts.tablet_schema->column(cid).unique_id(); + if (_opts.tablet_schema->column(cid).name() == BeConsts::ROWID_COL) { + _column_iterators[unique_id] = + new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id()); + continue; + } if (_column_iterators.count(unique_id) < 1) { RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid), &_column_iterators[unique_id])); diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h index 4b06aed2b2..de639dc164 100644 --- a/be/src/olap/schema.h +++ b/be/src/olap/schema.h @@ -19,6 +19,7 @@ #include +#include "common/consts.h" #include "olap/aggregate_func.h" #include "olap/field.h" #include "olap/row_cursor_cell.h" @@ -52,6 +53,9 @@ public: if (column.is_key()) { ++num_key_columns; } + if (column.name() == BeConsts::ROWID_COL) { + _rowid_col_idx = cid; + } columns.push_back(column); } _delete_sign_idx = tablet_schema->delete_sign_idx(); @@ -72,6 +76,9 @@ public: if (columns[i].name() == DELETE_SIGN) { _delete_sign_idx = i; } + if (columns[i].name() == BeConsts::ROWID_COL) { + _rowid_col_idx = i; + } _unique_ids[i] = columns[i].unique_id(); } _init(columns, col_ids, num_key_columns); @@ -145,6 +152,7 @@ public: int32_t unique_id(size_t index) const { return _unique_ids[index]; } int32_t delete_sign_idx() const { return _delete_sign_idx; } bool has_sequence_col() const { return _has_sequence_col; } + int32_t rowid_col_idx() const { return _rowid_col_idx; }; private: void _init(const std::vector& cols, const std::vector& col_ids, @@ -169,6 +177,7 @@ private: size_t _schema_size; int32_t _delete_sign_idx = -1; bool _has_sequence_col = false; + int32_t _rowid_col_idx = -1; }; } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f6eda348f3..06d4b71ca1 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -260,6 +260,20 @@ Status Tablet::revise_tablet_meta(const std::vector& rowset return res; } +RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) { + for (auto& version_rowset : _rs_version_map) { + if (version_rowset.second->rowset_id() == rowset_id) { + return version_rowset.second; + } + } + for (auto& stale_version_rowset : _stale_rs_version_map) { + if (stale_version_rowset.second->rowset_id() == rowset_id) { + return stale_version_rowset.second; + } + } + return nullptr; +} + Status Tablet::add_rowset(RowsetSharedPtr rowset) { DCHECK(rowset != nullptr); std::lock_guard wrlock(_meta_lock); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e10b9628ac..71151a6b84 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -344,6 +344,8 @@ public: int64_t start = -1); bool should_skip_compaction(CompactionType compaction_type, int64_t now); + RowsetSharedPtr get_rowset(const RowsetId& rowset_id); + private: Status _init_once_action(); void _print_missed_versions(const std::vector& missed_versions) const; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index bbbbfc896f..0ffcb3167d 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -64,6 +64,11 @@ public: size_t length() const { return _length; } size_t index_length() const { return _index_length; } void set_index_length(size_t index_length) { _index_length = index_length; } + void set_type(FieldType type) { _type = type; } + void set_is_key(bool is_key) { _is_key = is_key; } + void set_is_nullable(bool is_nullable) { _is_nullable = is_nullable; } + void set_unique_id(int32_t unique_id) { _unique_id = unique_id; } + void set_has_default_value(bool has) { _has_default_value = has; } FieldAggregationMethod aggregation() const { return _aggregation; } vectorized::AggregateFunctionPtr get_aggregate_function(vectorized::DataTypes argument_types, std::string suffix) const; diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index 964b973569..9c1a20d767 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -310,4 +310,11 @@ struct RowLocation { uint32_t row_id; }; +struct GlobalRowLoacation { + GlobalRowLoacation(uint32_t tid, RowsetId rsid, uint32_t sid, uint32_t rid) + : tablet_id(tid), row_location(rsid, sid, rid) {}; + uint32_t tablet_id; + RowLocation row_location; +}; + } // namespace doris diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 9eea456fb2..4272e9aa56 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -63,7 +63,9 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc) _slot_idx(tdesc.slotIdx), _slot_size(_type.get_slot_size()), _field_idx(-1), - _is_materialized(tdesc.isMaterialized) {} + _is_materialized(tdesc.isMaterialized), + _is_key(tdesc.is_key), + _need_materialize(tdesc.need_materialize) {} SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) : _id(pdesc.id()), @@ -74,11 +76,13 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) _null_indicator_offset(pdesc.null_indicator_byte(), pdesc.null_indicator_bit()), _col_name(pdesc.col_name()), _col_name_lower_case(to_lower(pdesc.col_name())), - _col_unique_id(-1), + _col_unique_id(pdesc.col_unique_id()), _slot_idx(pdesc.slot_idx()), _slot_size(_type.get_slot_size()), _field_idx(-1), - _is_materialized(pdesc.is_materialized()) {} + _is_materialized(pdesc.is_materialized()), + _is_key(pdesc.is_key()), + _need_materialize(true) {} void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const { pslot->set_id(_id); @@ -92,6 +96,8 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const { pslot->set_col_name(_col_name); pslot->set_slot_idx(_slot_idx); pslot->set_is_materialized(_is_materialized); + pslot->set_col_unique_id(_col_unique_id); + pslot->set_is_key(_is_key); } vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const { @@ -542,6 +548,9 @@ int RowDescriptor::get_column_id(int slot_id) const { int column_id_counter = 0; for (const auto tuple_desc : _tuple_desc_map) { for (const auto slot : tuple_desc->slots()) { + if (!slot->need_materialize()) { + continue; + } if (slot->id() == slot_id) { return column_id_counter; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index c49eebb6db..86e58a39a9 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -51,6 +51,7 @@ class SchemaScanner; class OlapTableSchemaParam; class PTupleDescriptor; class PSlotDescriptor; +class PInternalServiceImpl; // Location information for null indicator bit for particular slot. // For non-nullable slots, the byte_offset will be 0 and the bit_mask will be 0. @@ -116,11 +117,15 @@ public: int32_t col_unique_id() const { return _col_unique_id; } + bool is_key() const { return _is_key; } + bool need_materialize() const { return _need_materialize; } + private: friend class DescriptorTbl; friend class TupleDescriptor; friend class SchemaScanner; friend class OlapTableSchemaParam; + friend class PInternalServiceImpl; const SlotId _id; const TypeDescriptor _type; @@ -147,6 +152,9 @@ private: const bool _is_materialized; + const bool _is_key; + const bool _need_materialize; + SlotDescriptor(const TSlotDescriptor& tdesc); SlotDescriptor(const PSlotDescriptor& pdesc); }; @@ -342,6 +350,7 @@ private: friend class DescriptorTbl; friend class SchemaScanner; friend class OlapTableSchemaParam; + friend class PInternalServiceImpl; const TupleId _id; TableDescriptor* _table_desc; diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 4d480c9efe..a01c3924c0 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -165,7 +165,6 @@ private: std::atomic _is_cancelled {false}; std::shared_ptr _shared_hash_table_controller; - vectorized::RuntimePredicate _runtime_predicate; }; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 86c6a0347d..7eafe18bda 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -22,10 +22,14 @@ #include #include "common/config.h" +#include "common/consts.h" #include "gen_cpp/BackendService.h" #include "gen_cpp/internal_service.pb.h" #include "http/http_client.h" +#include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_factory.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/segment_loader.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "runtime/buffer_control_block.h" @@ -39,6 +43,7 @@ #include "runtime/thread_context.h" #include "service/brpc.h" #include "util/brpc_client_cache.h" +#include "util/defer_op.h" #include "util/md5.h" #include "util/proto_util.h" #include "util/ref_count_closure.h" @@ -48,6 +53,8 @@ #include "util/telemetry/telemetry.h" #include "util/thrift_util.h" #include "util/uid_util.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_string.h" #include "vec/exec/format/csv/csv_reader.h" #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/json/new_json_reader.h" @@ -949,4 +956,130 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( Status::OK().to_protobuf(response->mutable_status()); } +static Status read_by_rowids( + std::pair row_range_idx, const TupleDescriptor& desc, + const google::protobuf::RepeatedPtrField& rowids, + vectorized::Block* sub_block) { + //read from row_range.first to row_range.second + for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) { + MonotonicStopWatch watch; + watch.start(); + auto row_id = rowids[i]; + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( + row_id.tablet_id(), true /*include deleted*/); + RowsetId rowset_id; + rowset_id.init(row_id.rowset_id()); + if (!tablet) { + continue; + } + BetaRowsetSharedPtr rowset = + std::static_pointer_cast(tablet->get_rowset(rowset_id)); + if (!rowset) { + LOG(INFO) << "no such rowset " << rowset_id; + continue; + } + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + VLOG_DEBUG << "get tablet schema column_num:" << tablet_schema->num_columns() + << ", version:" << tablet_schema->schema_version() + << ", cost(us):" << watch.elapsed_time() / 1000; + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + // find segment + auto it = std::find_if(segment_cache.get_segments().begin(), + segment_cache.get_segments().end(), + [&row_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == row_id.segment_id(); + }); + if (it == segment_cache.get_segments().end()) { + continue; + } + segment_v2::SegmentSharedPtr segment = *it; + for (int x = 0; x < desc.slots().size() - 1; ++x) { + int index = tablet_schema->field_index(desc.slots()[x]->col_unique_id()); + segment_v2::ColumnIterator* column_iterator = nullptr; + vectorized::MutableColumnPtr column = + sub_block->get_by_position(x).column->assume_mutable(); + if (index < 0) { + column->insert_default(); + continue; + } else { + RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema->column(index), + &column_iterator)); + } + std::unique_ptr ptr_guard(column_iterator); + segment_v2::ColumnIteratorOptions opt; + OlapReaderStatistics stats; + opt.file_reader = segment->file_reader().get(); + opt.stats = &stats; + opt.use_page_cache = !config::disable_storage_page_cache; + column_iterator->init(opt); + std::vector rowids { + static_cast(row_id.ordinal_id())}; + RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column)); + } + LOG_EVERY_N(INFO, 100) << "multiget_data single_row, cost(us):" + << watch.elapsed_time() / 1000; + GlobalRowLoacation row_location(row_id.tablet_id(), rowset->rowset_id(), + row_id.segment_id(), row_id.ordinal_id()); + sub_block->get_columns().back()->assume_mutable()->insert_data( + reinterpret_cast(&row_location), sizeof(GlobalRowLoacation)); + } + return Status::OK(); +} + +Status PInternalServiceImpl::_multi_get(const PMultiGetRequest* request, + PMultiGetResponse* response) { + TupleDescriptor desc(request->desc()); + std::vector slots; + slots.reserve(request->slots().size()); + for (const auto& pslot : request->slots()) { + slots.push_back(SlotDescriptor(pslot)); + desc.add_slot(&slots.back()); + } + assert(desc.slots().back()->col_name() == BeConsts::ROWID_COL); + vectorized::Block block(desc.slots(), request->rowids().size()); + RETURN_IF_ERROR( + read_by_rowids(std::pair {0, request->rowids_size()}, desc, request->rowids(), &block)); + std::vector char_type_idx; + for (size_t i = 0; i < desc.slots().size(); i++) { + auto column_desc = desc.slots()[i]; + auto type_desc = column_desc->type(); + do { + if (type_desc.type == TYPE_CHAR) { + char_type_idx.emplace_back(i); + break; + } else if (type_desc.type != TYPE_ARRAY) { + break; + } + // for Array or Array> + type_desc = type_desc.children[0]; + } while (true); + } + // shrink char_type suffix zero data + block.shrink_char_type_column_suffix_zero(char_type_idx); + VLOG_DEBUG << "dump block:" << block.dump_data(0, 10) + << ", be_exec_version:" << request->be_exec_version(); + + [[maybe_unused]] size_t compressed_size = 0; + [[maybe_unused]] size_t uncompressed_size = 0; + int be_exec_version = request->has_be_exec_version() ? request->be_exec_version() : 0; + RETURN_IF_ERROR(block.serialize(be_exec_version, response->mutable_block(), &uncompressed_size, + &compressed_size, segment_v2::CompressionTypePB::LZ4)); + return Status::OK(); +} + +void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* controller, + const PMultiGetRequest* request, + PMultiGetResponse* response, + google::protobuf::Closure* done) { + // multi get data by rowid + MonotonicStopWatch watch; + watch.start(); + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(0); + Status st = _multi_get(request, response); + st.to_protobuf(response->mutable_status()); + LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000; +} + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 9b2a4db254..a591fbd65c 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -151,6 +151,8 @@ public: const PTabletWriteSlaveDoneRequest* request, PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* done) override; + void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request, + PMultiGetResponse* response, google::protobuf::Closure* done) override; private: Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, @@ -176,6 +178,7 @@ private: void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, int64_t txn_id, int64_t tablet_id, int64_t node_id, bool is_succeed); + Status _multi_get(const PMultiGetRequest* request, PMultiGetResponse* response); private: ExecEnv* _exec_env; diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp index 0ab1b81795..18bddaaf7a 100644 --- a/be/src/vec/common/sort/heap_sorter.cpp +++ b/be/src/vec/common/sort/heap_sorter.cpp @@ -45,6 +45,9 @@ Status HeapSorter::append_block(Block* block) { int i = 0; const auto& convert_nullable_flags = _vsort_exec_exprs.get_convert_nullable_flags(); for (auto column_id : valid_column_ids) { + if (column_id < 0) { + continue; + } if (convert_nullable_flags[i]) { auto column_ptr = make_nullable(block->get_by_position(column_id).column); new_block.insert({column_ptr, diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 9bc0a1034c..225deffa2b 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -247,6 +247,9 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) { int i = 0; const auto& convert_nullable_flags = _vsort_exec_exprs.get_convert_nullable_flags(); for (auto column_id : valid_column_ids) { + if (column_id < 0) { + continue; + } if (convert_nullable_flags[i]) { auto column_ptr = make_nullable(src_block.get_by_position(column_id).column); new_block.insert( diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 697453fe5a..486e336e57 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -18,6 +18,7 @@ #pragma once #include +#include "common/consts.h" #include "common/status.h" #include "vec/common/sort/vsort_exec_exprs.h" #include "vec/core/block.h" @@ -34,7 +35,11 @@ class MergeSorterState { public: MergeSorterState(const RowDescriptor& row_desc, int64_t offset, int64_t limit, RuntimeState* state, RuntimeProfile* profile) - : unsorted_block_(new Block(VectorizedUtils::create_empty_block(row_desc))), + // create_empty_block should ignore invalid slots, unsorted_block + // should be same structure with arrival block from child node + // since block from child node may ignored these slots + : unsorted_block_(new Block( + VectorizedUtils::create_empty_block(row_desc, true /*ignore invalid slot*/))), offset_(offset), limit_(limit), profile_(profile) { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index ebe46fdd0c..a23ecb2a75 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -54,8 +54,12 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data {data_} { initialize_index_by_name(); } -Block::Block(const std::vector& slots, size_t block_size) { +Block::Block(const std::vector& slots, size_t block_size, + bool ignore_trivial_slot) { for (const auto slot_desc : slots) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } auto column_ptr = slot_desc->get_empty_mutable_column(); column_ptr->reserve(block_size); insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), @@ -919,9 +923,13 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor } } -MutableBlock::MutableBlock(const std::vector& tuple_descs, int reserve_size) { +MutableBlock::MutableBlock(const std::vector& tuple_descs, int reserve_size, + bool ignore_trivial_slot) { for (auto tuple_desc : tuple_descs) { for (auto slot_desc : tuple_desc->slots()) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } _data_types.emplace_back(slot_desc->get_data_type_ptr()); _columns.emplace_back(_data_types.back()->create_column()); if (reserve_size != 0) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 220a0ff361..4ebc018b5b 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -74,7 +74,8 @@ public: Block(std::initializer_list il); Block(const ColumnsWithTypeAndName& data_); Block(const PBlock& pblock); - Block(const std::vector& slots, size_t block_size); + Block(const std::vector& slots, size_t block_size, + bool ignore_trivial_slot = false); /// insert the column at the specified position void insert(size_t position, const ColumnWithTypeAndName& elem); @@ -391,7 +392,8 @@ public: MutableBlock() = default; ~MutableBlock() = default; - MutableBlock(const std::vector& tuple_descs, int reserve_size = 0); + MutableBlock(const std::vector& tuple_descs, int reserve_size = 0, + bool igore_trivial_slot = false); MutableBlock(Block* block) : _columns(block->mutate_columns()), _data_types(block->get_data_types()) {} diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 049c5e2882..31991e3999 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -91,6 +91,22 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, _tablet_schema->append_column(TabletColumn(column_desc)); } } + + { + if (_output_tuple_desc->slots().back()->col_name() == BeConsts::ROWID_COL) { + // inject ROWID_COL + TabletColumn rowid_column; + rowid_column.set_is_nullable(false); + rowid_column.set_name(BeConsts::ROWID_COL); + // avoid column reader init error + rowid_column.set_has_default_value(true); + // fake unique id + rowid_column.set_unique_id(INT32_MAX); + rowid_column.set_type(FieldType::OLAP_FIELD_TYPE_STRING); + _tablet_schema->append_column(rowid_column); + } + } + { std::shared_lock rdlock(_tablet->get_header_lock()); const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); @@ -333,7 +349,9 @@ Status NewOlapScanner::_init_return_columns() { if (!slot->is_materialized()) { continue; } - + if (!slot->need_materialize()) { + continue; + } int32_t index = slot->col_unique_id() >= 0 ? _tablet_schema->field_index(slot->col_unique_id()) : _tablet_schema->field_index(slot->col_name()); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 29b379ecfe..6ce31490d0 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -55,7 +55,8 @@ Status ScannerContext::init() { // So use _output_tuple_desc; int64_t free_blocks_memory_usage = 0; for (int i = 0; i < pre_alloc_block_count; ++i) { - auto block = new vectorized::Block(_output_tuple_desc->slots(), real_block_size); + auto block = new vectorized::Block(_output_tuple_desc->slots(), real_block_size, + true /*ignore invalid slots*/); free_blocks_memory_usage += block->allocated_bytes(); _free_blocks.emplace_back(block); } @@ -93,7 +94,8 @@ vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) { *get_free_block = false; COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1); - return new vectorized::Block(_real_tuple_desc->slots(), _state->batch_size()); + return new vectorized::Block(_real_tuple_desc->slots(), _state->batch_size(), + true /*ignore invalid slots*/); } void ScannerContext::return_free_block(vectorized::Block* block) { diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index a718687c6b..7a956ecdb5 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -40,6 +40,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; if (!block->mem_reuse()) { for (const auto slot_desc : _output_tuple_desc->slots()) { + if (!slot_desc->need_materialize()) { + // should be ignore from reading + continue; + } block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name())); @@ -80,8 +84,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { Status VScanner::_filter_output_block(Block* block) { auto old_rows = block->rows(); - Status st = - VExprContext::filter_block(_vconjunct_ctx, block, _output_tuple_desc->slots().size()); + Status st = VExprContext::filter_block(_vconjunct_ctx, block, block->columns()); _counter.num_rows_unselected += old_rows - block->rows(); return st; } diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 069caaf635..d31da2418c 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -17,12 +17,15 @@ #include "vec/exec/vexchange_node.h" +#include "common/consts.h" +#include "exec/rowid_fetcher.h" #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/pipeline.h" #include "pipeline/pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "util/defer_op.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/runtime/vdata_stream_recvr.h" @@ -45,10 +48,15 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { if (!_is_merging) { return Status::OK(); } - RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.exchange_node.sort_info, _pool)); _is_asc_order = tnode.exchange_node.sort_info.is_asc_order; _nulls_first = tnode.exchange_node.sort_info.nulls_first; + + if (tnode.exchange_node.__isset.nodes_info) { + _nodes_info = _pool->add(new DorisNodesInfo(tnode.exchange_node.nodes_info)); + } + _use_two_phase_read = tnode.exchange_node.sort_info.__isset.use_two_phase_read && + tnode.exchange_node.sort_info.use_two_phase_read; return Status::OK(); } @@ -87,6 +95,19 @@ Status VExchangeNode::open(RuntimeState* state) { return Status::OK(); } +Status VExchangeNode::_second_phase_fetch_data(RuntimeState* state, Block* final_block) { + auto row_id_col = final_block->get_by_position(final_block->columns() - 1); + auto tuple_desc = _row_descriptor.tuple_descriptors()[0]; + RowIDFetcher id_fetcher(tuple_desc, state); + RETURN_IF_ERROR(id_fetcher.init(_nodes_info)); + MutableBlock materialized_block(_row_descriptor.tuple_descriptors(), final_block->rows()); + // fetch will sort block by sequence of ROWID_COL + RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, &materialized_block)); + // Notice swap may change the structure of final_block + final_block->swap(materialized_block.to_block()); + return Status::OK(); +} + Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VExchangeNode::get_next"); SCOPED_TIMER(runtime_profile()->total_time_counter()); @@ -97,6 +118,12 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { _is_ready = true; return Status::OK(); } + if (_use_two_phase_read) { + // Block structure may be changed by calling _second_phase_fetch_data() before. + // So we should clear block before _stream_recvr->get_next, since + // blocks in VSortedRunMerger may not compatible with this block. + block->clear(); + } auto status = _stream_recvr->get_next(block, eos); if (block != nullptr) { if (!_is_merging) { @@ -119,6 +146,9 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) { } COUNTER_SET(_rows_returned_counter, _num_rows_returned); } + if (_use_two_phase_read && block->rows() > 0) { + RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); + } return status; } diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index 2c63e03a5c..1d2738fda0 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -20,6 +20,8 @@ #include #include "exec/exec_node.h" +#include "exec/tablet_info.h" // DorisNodesInfo +#include "runtime/descriptors.h" #include "vec/common/sort/vsort_exec_exprs.h" namespace doris { @@ -47,6 +49,9 @@ public: // Status collect_query_statistics(QueryStatistics* statistics) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } + // final materializtion, used only in topn node + Status _second_phase_fetch_data(RuntimeState* state, Block* final_block); + private: int _num_senders; bool _is_merging; @@ -61,6 +66,10 @@ private: VSortExecExprs _vsort_exec_exprs; std::vector _is_asc_order; std::vector _nulls_first; + + // for fetch data by rowids + DorisNodesInfo* _nodes_info = nullptr; + bool _use_two_phase_read = false; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index fd7c88af75..3744c349c6 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -82,7 +82,6 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { } _sorter->init_profile(_runtime_profile.get()); - return Status::OK(); } @@ -127,7 +126,6 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool old_top = std::move(new_top); } } - if (!_reuse_mem) { input_block->clear(); } diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp index 18871ac773..2ba9c0b526 100644 --- a/be/src/vec/exprs/vslot_ref.cpp +++ b/be/src/vec/exprs/vslot_ref.cpp @@ -50,12 +50,17 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor& if (slot_desc == nullptr) { return Status::InternalError("couldn't resolve slot descriptor {}", _slot_id); } + _column_name = &slot_desc->col_name(); + if (!slot_desc->need_materialize()) { + // slot should be ignored manually + _column_id = -1; + return Status::OK(); + } _column_id = desc.get_column_id(_slot_id); if (_column_id < 0) { LOG(INFO) << "VSlotRef - invalid slot id: " << _slot_id << " desc:" << desc.debug_string(); return Status::InternalError("VSlotRef - invalid slot id {}", _slot_id); } - _column_name = &slot_desc->col_name(); return Status::OK(); } diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp index da8e3d19af..80574d0ae5 100644 --- a/be/src/vec/utils/util.hpp +++ b/be/src/vec/utils/util.hpp @@ -34,10 +34,14 @@ public: return create_columns_with_type_and_name(row_desc); } - static ColumnsWithTypeAndName create_columns_with_type_and_name(const RowDescriptor& row_desc) { + static ColumnsWithTypeAndName create_columns_with_type_and_name( + const RowDescriptor& row_desc, bool ignore_trivial_slot = false) { ColumnsWithTypeAndName columns_with_type_and_name; for (const auto& tuple_desc : row_desc.tuple_descriptors()) { for (const auto& slot_desc : tuple_desc->slots()) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } columns_with_type_and_name.emplace_back(nullptr, slot_desc->get_data_type_ptr(), slot_desc->col_name()); } @@ -45,10 +49,14 @@ public: return columns_with_type_and_name; } - static ColumnsWithTypeAndName create_empty_block(const RowDescriptor& row_desc) { + static ColumnsWithTypeAndName create_empty_block(const RowDescriptor& row_desc, + bool ignore_trivial_slot = false) { ColumnsWithTypeAndName columns_with_type_and_name; for (const auto& tuple_desc : row_desc.tuple_descriptors()) { for (const auto& slot_desc : tuple_desc->slots()) { + if (ignore_trivial_slot && !slot_desc->need_materialize()) { + continue; + } columns_with_type_and_name.emplace_back( slot_desc->get_data_type_ptr()->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index fa6d78c8ed..d9472377a3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1955,5 +1955,8 @@ public class Config extends ConfigBase { */ @ConfField(masterOnly = true) public static int hms_events_polling_interval_ms = 10000; + + @ConfField(mutable = false) + public static int topn_two_phase_limit_threshold = 512; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 151bb22717..c9884614f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -551,6 +551,10 @@ public class Analyzer { isInlineView = inlineView; } + public boolean isInlineViewAnalyzer() { + return isInlineView; + } + public void setExplicitViewAlias(String alias) { explicitViewAlias = alias; } @@ -997,6 +1001,9 @@ public class Analyzer { result.setStats(srcSlotDesc.getStats()); result.setType(srcSlotDesc.getType()); result.setIsNullable(srcSlotDesc.getIsNullable()); + if (srcSlotDesc.getColumn() != null) { + result.setColumn(srcSlotDesc.getColumn()); + } // result.setItemTupleDesc(srcSlotDesc.getItemTupleDesc()); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index f4915a4e58..724d2effbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; @@ -557,7 +558,6 @@ public class SelectStmt extends QueryStmt { "cannot combine SELECT DISTINCT with analytic functions"); } } - whereClauseRewrite(); if (whereClause != null) { if (checkGroupingFn(whereClause)) { @@ -576,7 +576,6 @@ public class SelectStmt extends QueryStmt { } analyzer.registerConjuncts(whereClause, false, getTableRefIds()); } - createSortInfo(analyzer); if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) { if (groupingInfo != null) { @@ -591,6 +590,33 @@ public class SelectStmt extends QueryStmt { analyzeAggregation(analyzer); createAnalyticInfo(analyzer); eliminatingSortNode(); + if (checkEnableTwoPhaseRead(analyzer)) { + // If optimize enabled, we try our best to read less columns from ScanNode, + // here we analyze conjunct exprs and ordering exprs before resultExprs, + // rest of resultExprs will be marked as `INVALID`, such columns will + // be prevent from reading from ScanNode.Those columns will be finally + // read by the second fetch phase + LOG.debug("two phase read optimize enabled"); + // Expr.analyze(resultExprs, analyzer); + Set resultSlots = Sets.newHashSet(); + Set orderingSlots = Sets.newHashSet(); + Set conjuntSlots = Sets.newHashSet(); + TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), resultSlots); + TreeNode.collect(sortInfo.getOrderingExprs(), Predicates.instanceOf(SlotRef.class), orderingSlots); + if (whereClause != null) { + whereClause.collect(SlotRef.class, conjuntSlots); + } + resultSlots.removeAll(orderingSlots); + resultSlots.removeAll(conjuntSlots); + // reset slots need to do fetch column + for (SlotRef slot : resultSlots) { + // invalid slots will be pruned from reading from ScanNode + slot.setInvalid(); + } + LOG.debug("resultsSlots {}", resultSlots); + LOG.debug("orderingSlots {}", orderingSlots); + LOG.debug("conjuntSlots {}", conjuntSlots); + } if (evaluateOrderBy) { createSortTupleInfo(analyzer); } @@ -615,6 +641,72 @@ public class SelectStmt extends QueryStmt { } } + // Check whether enable two phase read optimize, if enabled query will be devieded into two phase read: + // 1. read conjuncts columns and order by columns along with an extra RowId column from ScanNode + // 2. sort and filter data, and get final RowId column, spawn RPC to other BE to fetch final data + // 3. final matrialize all data + public boolean checkEnableTwoPhaseRead(Analyzer analyzer) { + // only vectorized mode and session opt variable enabled + if (ConnectContext.get() == null + || ConnectContext.get().getSessionVariable() == null + || !ConnectContext.get().getSessionVariable().enableVectorizedEngine + || !ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) { + return false; + } + if (!evaluateOrderBy) { + // Need evaluate orderby, if sort node was eliminated then this optmization + // could be useless + return false; + } + // Only handle the simplest `SELECT ... FROM WHERE ... ORDER BY ... LIMIT ...` query + if (getAggInfo() != null + || getHavingPred() != null + || getWithClause() != null) { + return false; + } + if (!analyzer.isRootAnalyzer()) { + // ensure no sub query + return false; + } + // If select stmt has inline view or this is an inline view query stmt analyze call + if (hasInlineView() || analyzer.isInlineViewAnalyzer()) { + return false; + } + // single olap table + List tblRefs = getTableRefs(); + if (tblRefs.size() != 1 || !(tblRefs.get(0) instanceof BaseTableRef)) { + return false; + } + TableRef tbl = tblRefs.get(0); + if (tbl.getTable().getType() != Table.TableType.OLAP) { + return false; + } + LOG.debug("table ref {}", tbl); + // Need enable light schema change, since opt rely on + // column_unique_id of each slot + OlapTable olapTable = (OlapTable) tbl.getTable(); + if (!olapTable.getEnableLightSchemaChange()) { + return false; + } + // Only TOPN query at present + if (getOrderByElements() == null + || !hasLimit() + || getLimit() == 0 + || getLimit() > ConnectContext.get().getSessionVariable().twoPhaseReadLimitThreshold) { + return false; + } + // Check order by exprs are all slot refs + // Rethink? implement more generic to support all exprs + LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs()); + LOG.debug("getOrderByElements {}", getOrderByElements()); + for (OrderByElement orderby : getOrderByElements()) { + if (!(orderby.getExpr() instanceof SlotRef)) { + return false; + } + } + return true; + } + public List getTableRefIds() { List result = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index e4682a4586..79026840b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -69,6 +69,9 @@ public class SlotDescriptor { private boolean isMultiRef; // used for load to get more information of varchar and decimal private Type originType; + // If set to false, then such slots will be ignored during + // materialize them.Used to optmize to read less data and less memory usage + private boolean needMaterialize = true; public SlotDescriptor(SlotId id, TupleDescriptor parent) { this.id = id; @@ -108,6 +111,14 @@ public class SlotDescriptor { return isAgg; } + public void setInvalid() { + this.needMaterialize = false; + } + + public boolean isInvalid() { + return !this.needMaterialize; + } + public void setIsAgg(boolean agg) { isAgg = agg; } @@ -255,6 +266,12 @@ public class SlotDescriptor { return sourceExprs; } + public int getUniqueId() { + if (column == null) { + return -1; + } + return column.getUniqueId(); + } /** * Initializes a slot by setting its source expression information @@ -301,10 +318,11 @@ public class SlotDescriptor { TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), (originType != null ? originType.toThrift() : type.toThrift()), -1, byteOffset, nullIndicatorByte, nullIndicatorBit, ((column != null) ? column.getName() : ""), slotIdx, isMaterialized); - + tSlotDescriptor.setNeedMaterialize(needMaterialize); if (column != null) { LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId()); tSlotDescriptor.setColUniqueId(column.getUniqueId()); + tSlotDescriptor.setIsKey(column.isKey()); } return tSlotDescriptor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java index 85afb1f98e..3d16e9af1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -122,6 +122,14 @@ public class SlotRef extends Expr { return desc.getId(); } + public void setInvalid() { + this.desc.setInvalid(); + } + + public boolean isInvalid() { + return this.desc.isInvalid(); + } + public Column getColumn() { if (desc == null) { return null; @@ -289,6 +297,7 @@ public class SlotRef extends Expr { protected void toThrift(TExprNode msg) { msg.node_type = TExprNodeType.SLOT_REF; msg.slot_ref = new TSlotRef(desc.getId().asInt(), desc.getParent().getId().asInt()); + msg.slot_ref.setColUniqueId(desc.getUniqueId()); msg.setOutputColumn(outputColumn); } @@ -437,6 +446,10 @@ public class SlotRef extends Expr { this.label = label; } + public boolean hasCol() { + return this.col != null; + } + public String getColumnName() { return col; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java index f090c2af80..7cdd2d0d0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java @@ -63,6 +63,7 @@ public class SortInfo { // Input expressions materialized into sortTupleDesc_. One expr per slot in // sortTupleDesc_. private List sortTupleSlotExprs; + private boolean useTwoPhaseRead = false; public SortInfo(List orderingExprs, List isAscOrder, List nullsFirstParams) { @@ -145,6 +146,14 @@ public class SortInfo { sortTupleDesc = tupleDesc; } + public void setUseTwoPhaseRead() { + useTwoPhaseRead = true; + } + + public boolean useTwoPhaseRead() { + return useTwoPhaseRead; + } + public TupleDescriptor getSortTupleDescriptor() { return sortTupleDesc; } @@ -258,6 +267,7 @@ public class SortInfo { // Update the tuple descriptor used to materialize the input of the sort. setMaterializedTupleInfo(sortTupleDesc, sortTupleExprs); + LOG.debug("sortTupleDesc {}", sortTupleDesc); return substOrderBy; } @@ -285,6 +295,11 @@ public class SortInfo { SlotDescriptor materializedDesc = analyzer.addSlotDescriptor(sortTupleDesc); materializedDesc.initFromExpr(origOrderingExpr); materializedDesc.setIsMaterialized(true); + SlotRef origSlotRef = origOrderingExpr.getSrcSlotRef(); + LOG.debug("origOrderingExpr {}", origOrderingExpr); + if (origSlotRef != null) { + materializedDesc.setColumn(origSlotRef.getColumn()); + } SlotRef materializedRef = new SlotRef(materializedDesc); substOrderBy.put(origOrderingExpr, materializedRef); materializedOrderingExprs.add(origOrderingExpr); @@ -301,6 +316,9 @@ public class SortInfo { Expr.treesToThrift(orderingExprs), isAscOrder, nullsFirstParams); + if (useTwoPhaseRead) { + sortInfo.setUseTwoPhaseRead(true); + } return sortInfo; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index adae901430..57fff2fb17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -58,6 +58,7 @@ public class Column implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(Column.class); public static final String DELETE_SIGN = "__DORIS_DELETE_SIGN__"; public static final String SEQUENCE_COL = "__DORIS_SEQUENCE_COL__"; + public static final String ROWID_COL = "__DORIS_ROWID_COL__"; private static final String COLUMN_ARRAY_CHILDREN = "item"; public static final int COLUMN_UNIQUE_ID_INIT_VALUE = -1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index d613fd9102..b1d54ec909 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -21,19 +21,22 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TNodeInfo; +import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TSortInfo; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; @@ -161,10 +164,10 @@ public class ExchangeNode extends PlanNode { msg.exchange_node.addToInputRowTuples(tid.asInt()); } if (mergeInfo != null) { - TSortInfo sortInfo = new TSortInfo( - Expr.treesToThrift(mergeInfo.getOrderingExprs()), - mergeInfo.getIsAscOrder(), mergeInfo.getNullsFirst()); - msg.exchange_node.setSortInfo(sortInfo); + msg.exchange_node.setSortInfo(mergeInfo.toThrift()); + if (mergeInfo.useTwoPhaseRead()) { + msg.exchange_node.setNodesInfo(createNodesInfo()); + } } msg.exchange_node.setOffset(offset); } @@ -187,4 +190,17 @@ public class ExchangeNode extends PlanNode { return prefix + "offset: " + offset + "\n"; } + /** + * Set the parameters used to fetch data by rowid column + * after init(). + */ + private TPaloNodesInfo createNodesInfo() { + TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) { + Backend backend = systemInfoService.getBackend(id); + nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); + } + return nodesInfo; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 1ecf8cc295..03088cdd18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -966,6 +966,9 @@ public class OlapScanNode extends ScanNode { sortInfo.getMaterializedOrderingExprs().forEach(expr -> { output.append(prefix).append(prefix).append(expr.toSql()).append("\n"); }); + if (sortInfo.useTwoPhaseRead()) { + output.append(prefix).append("OPT TWO PHASE\n"); + } } if (sortLimit != -1) { output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index a5472959b3..f54d0a2fdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -32,8 +32,10 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.qe.ConnectContext; @@ -239,6 +241,8 @@ public class OriginalPlanner extends Planner { } else { List resExprs = Expr.substituteList(queryStmt.getResultExprs(), rootFragment.getPlanRoot().getOutputSmap(), analyzer, false); + LOG.debug("result Exprs {}", queryStmt.getResultExprs()); + LOG.debug("substitute result Exprs {}", resExprs); rootFragment.setOutputExprs(resExprs); } LOG.debug("finalize plan fragments"); @@ -259,6 +263,9 @@ public class OriginalPlanner extends Planner { isBlockQuery = false; LOG.debug("this isn't block query"); } + if (selectStmt.checkEnableTwoPhaseRead(analyzer)) { + injectRowIdColumnSlot(); + } } } @@ -334,6 +341,52 @@ public class OriginalPlanner extends Planner { topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId())); } + + private SlotDescriptor injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(tupleDesc); + LOG.debug("inject slot {}", slotDesc); + String name = Column.ROWID_COL; + Column col = new Column(name, Type.STRING, false, null, false, "", + "rowid column"); + slotDesc.setType(Type.STRING); + slotDesc.setColumn(col); + slotDesc.setIsNullable(false); + slotDesc.setIsMaterialized(true); + // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask + slotDesc.setNullIndicatorBit(-1); + slotDesc.setNullIndicatorByte(0); + return slotDesc; + } + + // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] order by column1 limit n + // in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes + // in the second phase, we have n rows, we do a fetch rpc to get all rowids date for the n rows + // and reconconstruct the final block + private void injectRowIdColumnSlot() { + for (PlanFragment fragment : fragments) { + PlanNode node = fragment.getPlanRoot(); + PlanNode parent = null; + // OlapScanNode is the last node. + // So, just get the last two node and check if they are SortNode and OlapScan. + while (node.getChildren().size() != 0) { + parent = node; + node = node.getChildren().get(0); + } + + if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) { + continue; + } + SortNode sortNode = (SortNode) parent; + OlapScanNode scanNode = (OlapScanNode) node; + SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); + injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor()); + SlotRef extSlot = new SlotRef(slot); + sortNode.getResolvedTupleExprs().add(extSlot); + sortNode.getSortInfo().setUseTwoPhaseRead(); + break; + } + } + /** * Push sort down to olap scan. */ @@ -354,6 +407,7 @@ public class OriginalPlanner extends Planner { } SortNode sortNode = (SortNode) parent; OlapScanNode scanNode = (OlapScanNode) node; + if (!scanNode.checkPushSort(sortNode)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index d64e34deb6..8e74a38bc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -466,9 +466,6 @@ public abstract class PlanNode extends TreeNode implements PlanStats { } } - - - public String getExplainString() { return getExplainString("", "", TExplainLevel.VERBOSE); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 57bd28fd49..a31205a975 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -142,6 +142,10 @@ public class SortNode extends PlanNode { this.useTopnOpt = useTopnOpt; } + public List getResolvedTupleExprs() { + return resolvedTupleExprs; + } + @Override public void setCompactData(boolean on) { this.compactData = on; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 996b79d944..d1eca98bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -254,6 +254,9 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; + public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt"; + public static final String TWO_PHASE_READ_OPT_LIMIT_THRESHOLD = "two_phase_read_opt_limit_threshold"; + // session origin value public Map sessionOriginValue = new HashMap(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -659,6 +662,14 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD, checker = "checkExternalSortBytesThreshold") public long externalSortBytesThreshold = 0; + // Whether enable two phase read optimization + // 1. read related rowids along with necessary column data + // 2. spawn fetch RPC to other nodes to get related data by sorted rowids + @VariableMgr.VarAttr(name = ENABLE_TWO_PHASE_READ_OPT) + public boolean enableTwoPhaseReadOpt = true; + @VariableMgr.VarAttr(name = TWO_PHASE_READ_OPT_LIMIT_THRESHOLD) + public long twoPhaseReadLimitThreshold = 512; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index acde58bbfa..4bda9216d8 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -34,6 +34,8 @@ message PSlotDescriptor { required string col_name = 8; required int32 slot_idx = 9; required bool is_materialized = 10; + required int32 col_unique_id = 11; + required bool is_key = 12; }; message PTupleDescriptor { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 50bdb1bb68..413fef6a01 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -538,6 +538,25 @@ message PFetchTableSchemaResult { optional int32 column_nums = 2; repeated string column_names = 3; repeated PTypeDesc column_types = 4; +} + +message PMultiGetRequest { + message RowId { + optional int64 tablet_id = 1; + optional string rowset_id = 2; + optional uint64 segment_id = 3; + optional uint64 ordinal_id = 4; + }; + repeated RowId rowids = 1; + optional PTupleDescriptor desc = 2; + repeated PSlotDescriptor slots = 3; + // for compability + optional int32 be_exec_version = 4; +}; + +message PMultiGetResponse { + optional PBlock block = 1; + optional PStatus status = 2; }; service PBackendService { @@ -572,5 +591,6 @@ service PBackendService { rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns (PTabletWriteSlaveResult); rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) returns (PTabletWriteSlaveDoneResult); rpc fetch_table_schema(PFetchTableSchemaRequest) returns (PFetchTableSchemaResult); + rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse); }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 1db9bd0537..2aa3be3f5f 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -51,6 +51,10 @@ struct TSlotDescriptor { 9: required i32 slotIdx 10: required bool isMaterialized 11: optional i32 col_unique_id = -1 + 12: optional bool is_key = false + // If set to false, then such slots will be ignored during + // materialize them.Used to optmize to read less data and less memory usage + 13: optional bool need_materialize = true } struct TTupleDescriptor { diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift index 230deb51ff..84dacd0663 100644 --- a/gensrc/thrift/Exprs.thrift +++ b/gensrc/thrift/Exprs.thrift @@ -143,6 +143,7 @@ struct TTupleIsNullPredicate { struct TSlotRef { 1: required Types.TSlotId slot_id 2: required Types.TTupleId tuple_id + 3: optional i32 col_unique_id } struct TStringLiteral { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5b72633c39..6eed8b46cd 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -543,6 +543,8 @@ struct TSortInfo { // Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap 5: optional list slot_exprs_nullability_changed_flags + // Indicates whether topn query using two phase read + 6: optional bool use_two_phase_read } enum TPushAggOp { @@ -891,6 +893,8 @@ struct TExchangeNode { 2: optional TSortInfo sort_info // This is tHe number of rows to skip before returning results 3: optional i64 offset + // Nodes in this cluster, used for second phase fetch + 4: optional Descriptors.TPaloNodesInfo nodes_info } struct TOlapRewriteNode {