[Bug](topn) Fix topn fetch set real default value (#20074)
1. Before this PR if rowset does not contain column which should be read for related SlotDescriptor will call `insert_default` to column, but it's not this real defautl value.Real default value relevant information should be provided by the frontend side. 2. Support fetch when light schema change is not enabled, but disable for AGG or UNIQUE MOR model
This commit is contained in:
@ -22,6 +22,7 @@
|
||||
#include <fmt/format.h>
|
||||
#include <gen_cpp/data.pb.h>
|
||||
#include <gen_cpp/internal_service.pb.h>
|
||||
#include <gen_cpp/olap_file.pb.h>
|
||||
#include <gen_cpp/types.pb.h>
|
||||
#include <glog/logging.h>
|
||||
#include <stddef.h>
|
||||
@ -40,6 +41,7 @@
|
||||
#include "common/consts.h"
|
||||
#include "exec/tablet_info.h" // DorisNodesInfo
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/tablet_schema.h"
|
||||
#include "olap/utils.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/exec_env.h" // ExecEnv
|
||||
@ -96,6 +98,11 @@ PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnStrin
|
||||
row_loc.set_ordinal_id(location->row_location.row_id);
|
||||
*mget_req.add_row_locs() = std::move(row_loc);
|
||||
}
|
||||
// Set column desc
|
||||
for (const TColumn& tcolumn : _fetch_option.t_fetch_opt.column_desc) {
|
||||
TabletColumn column(tcolumn);
|
||||
column.to_schema_pb(mget_req.add_column_desc());
|
||||
}
|
||||
PUniqueId& query_id = *mget_req.mutable_query_id();
|
||||
query_id.set_hi(_fetch_option.runtime_state->query_id().hi);
|
||||
query_id.set_lo(_fetch_option.runtime_state->query_id().lo);
|
||||
|
||||
@ -1483,6 +1483,12 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
|
||||
desc.add_slot(&slots.back());
|
||||
}
|
||||
|
||||
// init read schema
|
||||
TabletSchema full_read_schema;
|
||||
for (const ColumnPB& column_pb : request.column_desc()) {
|
||||
full_read_schema.append_column(TabletColumn(column_pb));
|
||||
}
|
||||
|
||||
// read row by row
|
||||
for (size_t i = 0; i < request.row_locs_size(); ++i) {
|
||||
const auto& row_loc = request.row_locs(i);
|
||||
@ -1508,10 +1514,6 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
|
||||
<< ", row_size:" << row_size;
|
||||
*response->add_row_locs() = row_loc;
|
||||
});
|
||||
const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
|
||||
VLOG_DEBUG << "get tablet schema column_num:" << tablet_schema->num_columns()
|
||||
<< ", version:" << tablet_schema->schema_version()
|
||||
<< ", cost(us):" << watch.elapsed_time() / 1000;
|
||||
SegmentCacheHandle segment_cache;
|
||||
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
|
||||
// find segment
|
||||
@ -1541,17 +1543,24 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
|
||||
result_block = vectorized::Block(desc.slots(), request.row_locs().size());
|
||||
}
|
||||
for (int x = 0; x < desc.slots().size(); ++x) {
|
||||
int index = tablet_schema->field_index(desc.slots()[x]->col_unique_id());
|
||||
int index = -1;
|
||||
if (desc.slots()[x]->col_unique_id() >= 0) {
|
||||
// light sc enabled
|
||||
index = full_read_schema.field_index(desc.slots()[x]->col_unique_id());
|
||||
} else {
|
||||
index = full_read_schema.field_index(desc.slots()[x]->col_name());
|
||||
}
|
||||
if (index < 0) {
|
||||
std::stringstream ss;
|
||||
ss << "field name is invalid. field=" << desc.slots()[x]->col_name()
|
||||
<< ", field_name_to_index=" << full_read_schema.get_all_field_names();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
segment_v2::ColumnIterator* column_iterator = nullptr;
|
||||
vectorized::MutableColumnPtr column =
|
||||
result_block.get_by_position(x).column->assume_mutable();
|
||||
if (index < 0) {
|
||||
column->insert_default();
|
||||
continue;
|
||||
} else {
|
||||
RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema->column(index),
|
||||
&column_iterator));
|
||||
}
|
||||
RETURN_IF_ERROR(
|
||||
segment->new_column_iterator(full_read_schema.column(index), &column_iterator));
|
||||
std::unique_ptr<segment_v2::ColumnIterator> ptr_guard(column_iterator);
|
||||
segment_v2::ColumnIteratorOptions opt;
|
||||
OlapReaderStatistics stats;
|
||||
|
||||
@ -753,7 +753,8 @@ public class SelectStmt extends QueryStmt {
|
||||
// Need enable light schema change, since opt rely on
|
||||
// column_unique_id of each slot
|
||||
OlapTable olapTable = (OlapTable) tbl.getTable();
|
||||
if (!olapTable.getEnableLightSchemaChange()) {
|
||||
if (!olapTable.isDupKeysOrMergeOnWrite()) {
|
||||
LOG.debug("only support duplicate key or MOW model");
|
||||
return false;
|
||||
}
|
||||
if (getOrderByElements() != null) {
|
||||
|
||||
@ -154,6 +154,7 @@ import org.apache.doris.planner.external.HiveScanNode;
|
||||
import org.apache.doris.planner.external.HudiScanNode;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
|
||||
import org.apache.doris.tablefunction.TableValuedFunctionIf;
|
||||
import org.apache.doris.thrift.TColumn;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
import org.apache.doris.thrift.TPartitionType;
|
||||
import org.apache.doris.thrift.TPushAggOp;
|
||||
@ -216,6 +217,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
boolean needFetch = false;
|
||||
// Only single olap table should be fetched
|
||||
OlapTable fetchOlapTable = null;
|
||||
OlapScanNode scanNode = null;
|
||||
for (PlanFragment fragment : context.getPlanFragments()) {
|
||||
PlanNode node = fragment.getPlanRoot();
|
||||
PlanNode parent = null;
|
||||
@ -229,7 +231,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
// case1: general topn optimized query
|
||||
if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) {
|
||||
SortNode sortNode = (SortNode) parent;
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
scanNode = (OlapScanNode) node;
|
||||
if (sortNode.getUseTwoPhaseReadOpt()) {
|
||||
needFetch = true;
|
||||
fetchOlapTable = scanNode.getOlapTable();
|
||||
@ -243,6 +245,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn());
|
||||
fetchOption.setUseTwoPhaseFetch(true);
|
||||
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
|
||||
if (!fetchOlapTable.storeRowColumn()) {
|
||||
// Set column desc for each column
|
||||
List<TColumn> columnsDesc = new ArrayList<TColumn>();
|
||||
scanNode.getColumnDesc(columnsDesc, null, null);
|
||||
fetchOption.setColumnDesc(columnsDesc);
|
||||
}
|
||||
((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -1256,25 +1256,35 @@ public class OlapScanNode extends ScanNode {
|
||||
return shouldColoScan;
|
||||
}
|
||||
|
||||
public void getColumnDesc(List<TColumn> columnsDesc, List<String> keyColumnNames,
|
||||
List<TPrimitiveType> keyColumnTypes) {
|
||||
if (selectedIndexId != -1) {
|
||||
for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) {
|
||||
TColumn tColumn = col.toThrift();
|
||||
col.setIndexFlag(tColumn, olapTable);
|
||||
if (columnsDesc != null) {
|
||||
columnsDesc.add(tColumn);
|
||||
}
|
||||
if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) {
|
||||
if (keyColumnNames != null) {
|
||||
keyColumnNames.add(col.getName());
|
||||
}
|
||||
if (keyColumnTypes != null) {
|
||||
keyColumnTypes.add(col.getDataType().toThrift());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
List<String> keyColumnNames = new ArrayList<String>();
|
||||
List<TPrimitiveType> keyColumnTypes = new ArrayList<TPrimitiveType>();
|
||||
List<TColumn> columnsDesc = new ArrayList<TColumn>();
|
||||
getColumnDesc(columnsDesc, keyColumnNames, keyColumnTypes);
|
||||
List<TOlapTableIndex> indexDesc = Lists.newArrayList();
|
||||
|
||||
if (selectedIndexId != -1) {
|
||||
for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) {
|
||||
TColumn tColumn = col.toThrift();
|
||||
col.setIndexFlag(tColumn, olapTable);
|
||||
columnsDesc.add(tColumn);
|
||||
if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) {
|
||||
keyColumnNames.add(col.getName());
|
||||
keyColumnTypes.add(col.getDataType().toThrift());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (Index index : olapTable.getIndexes()) {
|
||||
TOlapTableIndex tIndex = index.toThrift();
|
||||
indexDesc.add(tIndex);
|
||||
|
||||
@ -52,6 +52,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
|
||||
import org.apache.doris.statistics.query.StatsDelta;
|
||||
import org.apache.doris.thrift.TColumn;
|
||||
import org.apache.doris.thrift.TFetchOption;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.thrift.TRuntimeFilterMode;
|
||||
@ -476,6 +477,7 @@ public class OriginalPlanner extends Planner {
|
||||
private void injectRowIdColumnSlot() {
|
||||
boolean injected = false;
|
||||
OlapTable olapTable = null;
|
||||
OlapScanNode scanNode = null;
|
||||
for (PlanFragment fragment : fragments) {
|
||||
PlanNode node = fragment.getPlanRoot();
|
||||
PlanNode parent = null;
|
||||
@ -489,7 +491,7 @@ public class OriginalPlanner extends Planner {
|
||||
// case1
|
||||
if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) {
|
||||
SortNode sortNode = (SortNode) parent;
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
scanNode = (OlapScanNode) node;
|
||||
SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
|
||||
injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor());
|
||||
SlotRef extSlot = new SlotRef(slot);
|
||||
@ -501,7 +503,7 @@ public class OriginalPlanner extends Planner {
|
||||
}
|
||||
// case2
|
||||
if ((node instanceof OlapScanNode) && parent == null) {
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
scanNode = (OlapScanNode) node;
|
||||
injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
|
||||
injected = true;
|
||||
olapTable = scanNode.getOlapTable();
|
||||
@ -514,6 +516,13 @@ public class OriginalPlanner extends Planner {
|
||||
fetchOption.setFetchRowStore(olapTable.storeRowColumn());
|
||||
fetchOption.setUseTwoPhaseFetch(true);
|
||||
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
|
||||
// TODO for row store used seperate more faster path for wide tables
|
||||
if (!olapTable.storeRowColumn()) {
|
||||
// Set column desc for each column
|
||||
List<TColumn> columnsDesc = new ArrayList<TColumn>();
|
||||
scanNode.getColumnDesc(columnsDesc, null, null);
|
||||
fetchOption.setColumnDesc(columnsDesc);
|
||||
}
|
||||
((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -629,6 +629,7 @@ message PMultiGetRequest {
|
||||
optional int32 be_exec_version = 4;
|
||||
optional bool fetch_row_store = 5;
|
||||
optional PUniqueId query_id = 6;
|
||||
repeated ColumnPB column_desc = 7;
|
||||
};
|
||||
|
||||
message PMultiGetResponse {
|
||||
|
||||
@ -166,6 +166,8 @@ struct TFetchOption {
|
||||
2: optional Descriptors.TPaloNodesInfo nodes_info;
|
||||
// Whether fetch row store
|
||||
3: optional bool fetch_row_store;
|
||||
// Fetch schema
|
||||
4: optional list<Descriptors.TColumn> column_desc;
|
||||
}
|
||||
|
||||
struct TResultSink {
|
||||
|
||||
@ -59,3 +59,9 @@ z
|
||||
2023-03-21T07:00 area1 p0 aaaaa ddddd2 100.000 100.000 100.000 100.000 2023-03-21T17:00
|
||||
2023-03-21T06:00 area1 p0 aaaaa ddddd1 100.000 100.000 100.000 100.000 2023-03-21T17:00
|
||||
|
||||
-- !sql --
|
||||
1 1024
|
||||
2 1024
|
||||
3 \N
|
||||
3 0
|
||||
|
||||
|
||||
@ -100,4 +100,22 @@ suite("sort") {
|
||||
"""
|
||||
|
||||
qt_sql_orderby_non_overlap_desc """ select * from sort_non_overlap order by time_period desc limit 4; """
|
||||
|
||||
sql """ DROP TABLE if exists `sort_default_value`; """
|
||||
sql """ CREATE TABLE `sort_default_value` (
|
||||
`k1` int NOT NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"disable_auto_compaction" = "true"
|
||||
);
|
||||
"""
|
||||
sql "insert into sort_default_value values (1)"
|
||||
sql "insert into sort_default_value values (2)"
|
||||
sql """ alter table sort_default_value add column k4 INT default "1024" """
|
||||
sql "insert into sort_default_value values (3, 0)"
|
||||
sql "insert into sort_default_value values (3, null)"
|
||||
qt_sql "select * from sort_default_value order by k1 limit 10"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user