From 317338913cb2fc545bcda1cde47da7c45e71b340 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Fri, 26 May 2023 16:06:55 +0800 Subject: [PATCH] [Bug](topn) Fix topn fetch set real default value (#20074) 1. Before this PR if rowset does not contain column which should be read for related SlotDescriptor will call `insert_default` to column, but it's not this real defautl value.Real default value relevant information should be provided by the frontend side. 2. Support fetch when light schema change is not enabled, but disable for AGG or UNIQUE MOR model --- be/src/exec/rowid_fetcher.cpp | 7 ++++ be/src/service/internal_service.cpp | 33 +++++++++++------- .../org/apache/doris/analysis/SelectStmt.java | 3 +- .../translator/PhysicalPlanTranslator.java | 10 +++++- .../apache/doris/planner/OlapScanNode.java | 34 ++++++++++++------- .../apache/doris/planner/OriginalPlanner.java | 13 +++++-- gensrc/proto/internal_service.proto | 1 + gensrc/thrift/DataSinks.thrift | 2 ++ regression-test/data/query_p0/sort/sort.out | 6 ++++ .../suites/query_p0/sort/sort.groovy | 18 ++++++++++ 10 files changed, 99 insertions(+), 28 deletions(-) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index 28adba4ec2..7a866f0773 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,7 @@ #include "common/consts.h" #include "exec/tablet_info.h" // DorisNodesInfo #include "olap/olap_common.h" +#include "olap/tablet_schema.h" #include "olap/utils.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" // ExecEnv @@ -96,6 +98,11 @@ PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnStrin row_loc.set_ordinal_id(location->row_location.row_id); *mget_req.add_row_locs() = std::move(row_loc); } + // Set column desc + for (const TColumn& tcolumn : _fetch_option.t_fetch_opt.column_desc) { + TabletColumn column(tcolumn); + column.to_schema_pb(mget_req.add_column_desc()); + } PUniqueId& query_id = *mget_req.mutable_query_id(); query_id.set_hi(_fetch_option.runtime_state->query_id().hi); query_id.set_lo(_fetch_option.runtime_state->query_id().lo); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 5994925301..1bd05dbe64 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1483,6 +1483,12 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, desc.add_slot(&slots.back()); } + // init read schema + TabletSchema full_read_schema; + for (const ColumnPB& column_pb : request.column_desc()) { + full_read_schema.append_column(TabletColumn(column_pb)); + } + // read row by row for (size_t i = 0; i < request.row_locs_size(); ++i) { const auto& row_loc = request.row_locs(i); @@ -1508,10 +1514,6 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, << ", row_size:" << row_size; *response->add_row_locs() = row_loc; }); - const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); - VLOG_DEBUG << "get tablet schema column_num:" << tablet_schema->num_columns() - << ", version:" << tablet_schema->schema_version() - << ", cost(us):" << watch.elapsed_time() / 1000; SegmentCacheHandle segment_cache; RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); // find segment @@ -1541,17 +1543,24 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, result_block = vectorized::Block(desc.slots(), request.row_locs().size()); } for (int x = 0; x < desc.slots().size(); ++x) { - int index = tablet_schema->field_index(desc.slots()[x]->col_unique_id()); + int index = -1; + if (desc.slots()[x]->col_unique_id() >= 0) { + // light sc enabled + index = full_read_schema.field_index(desc.slots()[x]->col_unique_id()); + } else { + index = full_read_schema.field_index(desc.slots()[x]->col_name()); + } + if (index < 0) { + std::stringstream ss; + ss << "field name is invalid. field=" << desc.slots()[x]->col_name() + << ", field_name_to_index=" << full_read_schema.get_all_field_names(); + return Status::InternalError(ss.str()); + } segment_v2::ColumnIterator* column_iterator = nullptr; vectorized::MutableColumnPtr column = result_block.get_by_position(x).column->assume_mutable(); - if (index < 0) { - column->insert_default(); - continue; - } else { - RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema->column(index), - &column_iterator)); - } + RETURN_IF_ERROR( + segment->new_column_iterator(full_read_schema.column(index), &column_iterator)); std::unique_ptr ptr_guard(column_iterator); segment_v2::ColumnIteratorOptions opt; OlapReaderStatistics stats; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index f1c7f920c6..d83c38445e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -753,7 +753,8 @@ public class SelectStmt extends QueryStmt { // Need enable light schema change, since opt rely on // column_unique_id of each slot OlapTable olapTable = (OlapTable) tbl.getTable(); - if (!olapTable.getEnableLightSchemaChange()) { + if (!olapTable.isDupKeysOrMergeOnWrite()) { + LOG.debug("only support duplicate key or MOW model"); return false; } if (getOrderByElements() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 9e567bbe36..e2f04c6ea6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -154,6 +154,7 @@ import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.HudiScanNode; import org.apache.doris.planner.external.iceberg.IcebergScanNode; import org.apache.doris.tablefunction.TableValuedFunctionIf; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; @@ -216,6 +217,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor columnsDesc = new ArrayList(); + scanNode.getColumnDesc(columnsDesc, null, null); + fetchOption.setColumnDesc(columnsDesc); + } ((ResultSink) fragment.getSink()).setFetchOption(fetchOption); break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index b9884211f3..af673a9847 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1256,25 +1256,35 @@ public class OlapScanNode extends ScanNode { return shouldColoScan; } + public void getColumnDesc(List columnsDesc, List keyColumnNames, + List keyColumnTypes) { + if (selectedIndexId != -1) { + for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) { + TColumn tColumn = col.toThrift(); + col.setIndexFlag(tColumn, olapTable); + if (columnsDesc != null) { + columnsDesc.add(tColumn); + } + if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) { + if (keyColumnNames != null) { + keyColumnNames.add(col.getName()); + } + if (keyColumnTypes != null) { + keyColumnTypes.add(col.getDataType().toThrift()); + } + } + } + } + } + @Override protected void toThrift(TPlanNode msg) { List keyColumnNames = new ArrayList(); List keyColumnTypes = new ArrayList(); List columnsDesc = new ArrayList(); + getColumnDesc(columnsDesc, keyColumnNames, keyColumnTypes); List indexDesc = Lists.newArrayList(); - if (selectedIndexId != -1) { - for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) { - TColumn tColumn = col.toThrift(); - col.setIndexFlag(tColumn, olapTable); - columnsDesc.add(tColumn); - if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) { - keyColumnNames.add(col.getName()); - keyColumnTypes.add(col.getDataType().toThrift()); - } - } - } - for (Index index : olapTable.getIndexes()) { TOlapTableIndex tIndex = index.toThrift(); indexDesc.add(tIndex); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 42e72d6c84..e322b619ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -52,6 +52,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.statistics.query.StatsDelta; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TRuntimeFilterMode; @@ -476,6 +477,7 @@ public class OriginalPlanner extends Planner { private void injectRowIdColumnSlot() { boolean injected = false; OlapTable olapTable = null; + OlapScanNode scanNode = null; for (PlanFragment fragment : fragments) { PlanNode node = fragment.getPlanRoot(); PlanNode parent = null; @@ -489,7 +491,7 @@ public class OriginalPlanner extends Planner { // case1 if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) { SortNode sortNode = (SortNode) parent; - OlapScanNode scanNode = (OlapScanNode) node; + scanNode = (OlapScanNode) node; SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor()); SlotRef extSlot = new SlotRef(slot); @@ -501,7 +503,7 @@ public class OriginalPlanner extends Planner { } // case2 if ((node instanceof OlapScanNode) && parent == null) { - OlapScanNode scanNode = (OlapScanNode) node; + scanNode = (OlapScanNode) node; injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); injected = true; olapTable = scanNode.getOlapTable(); @@ -514,6 +516,13 @@ public class OriginalPlanner extends Planner { fetchOption.setFetchRowStore(olapTable.storeRowColumn()); fetchOption.setUseTwoPhaseFetch(true); fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo()); + // TODO for row store used seperate more faster path for wide tables + if (!olapTable.storeRowColumn()) { + // Set column desc for each column + List columnsDesc = new ArrayList(); + scanNode.getColumnDesc(columnsDesc, null, null); + fetchOption.setColumnDesc(columnsDesc); + } ((ResultSink) fragment.getSink()).setFetchOption(fetchOption); break; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index a04cc914c1..9ace031873 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -629,6 +629,7 @@ message PMultiGetRequest { optional int32 be_exec_version = 4; optional bool fetch_row_store = 5; optional PUniqueId query_id = 6; + repeated ColumnPB column_desc = 7; }; message PMultiGetResponse { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 7883aab0b6..23e21b6715 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -166,6 +166,8 @@ struct TFetchOption { 2: optional Descriptors.TPaloNodesInfo nodes_info; // Whether fetch row store 3: optional bool fetch_row_store; + // Fetch schema + 4: optional list column_desc; } struct TResultSink { diff --git a/regression-test/data/query_p0/sort/sort.out b/regression-test/data/query_p0/sort/sort.out index e78ae593eb..87b7e89cae 100644 --- a/regression-test/data/query_p0/sort/sort.out +++ b/regression-test/data/query_p0/sort/sort.out @@ -59,3 +59,9 @@ z 2023-03-21T07:00 area1 p0 aaaaa ddddd2 100.000 100.000 100.000 100.000 2023-03-21T17:00 2023-03-21T06:00 area1 p0 aaaaa ddddd1 100.000 100.000 100.000 100.000 2023-03-21T17:00 +-- !sql -- +1 1024 +2 1024 +3 \N +3 0 + diff --git a/regression-test/suites/query_p0/sort/sort.groovy b/regression-test/suites/query_p0/sort/sort.groovy index c3fdd818e7..fe6ec5a8d1 100644 --- a/regression-test/suites/query_p0/sort/sort.groovy +++ b/regression-test/suites/query_p0/sort/sort.groovy @@ -100,4 +100,22 @@ suite("sort") { """ qt_sql_orderby_non_overlap_desc """ select * from sort_non_overlap order by time_period desc limit 4; """ + + sql """ DROP TABLE if exists `sort_default_value`; """ + sql """ CREATE TABLE `sort_default_value` ( + `k1` int NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + sql "insert into sort_default_value values (1)" + sql "insert into sort_default_value values (2)" + sql """ alter table sort_default_value add column k4 INT default "1024" """ + sql "insert into sort_default_value values (3, 0)" + sql "insert into sort_default_value values (3, null)" + qt_sql "select * from sort_default_value order by k1 limit 10" }