diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 4675d668f4..c9da7359a3 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -341,7 +341,19 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, indexes[i], ori_tablet_schema); } if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) { - _tablet->update_max_version_schema(_tablet_schema); + // After schema change, should include extracted column + // For example: a table has two columns, k and v + // After adding a column v2, the schema version increases, max_version_schema needs to be updated. + // _tablet_schema includes k, v, and v2 + // if v is a variant, need to add the columns decomposed from the v to the _tablet_schema. + if (_tablet_schema->num_variant_columns() > 0) { + TabletSchemaSPtr max_version_schema = std::make_shared(); + max_version_schema->copy_from(*_tablet_schema); + max_version_schema->copy_extracted_columns(ori_tablet_schema); + _tablet->update_max_version_schema(max_version_schema); + } else { + _tablet->update_max_version_schema(_tablet_schema); + } } _tablet_schema->set_table_id(table_schema_param->table_id()); diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index e8f9d5d52b..7d1abd8aa9 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1031,6 +1031,33 @@ bool TabletSchema::is_dropped_column(const TabletColumn& col) const { column(col.name()).unique_id() != col.unique_id(); } +void TabletSchema::copy_extracted_columns(const TabletSchema& src_schema) { + std::unordered_set variant_columns; + for (const auto& col : columns()) { + if (col.is_variant_type()) { + variant_columns.insert(col.unique_id()); + } + } + for (const TabletColumn& col : src_schema.columns()) { + if (col.is_extracted_column() && variant_columns.contains(col.parent_unique_id())) { + ColumnPB col_pb; + col.to_schema_pb(&col_pb); + TabletColumn new_col(col_pb); + append_column(new_col, ColumnType::VARIANT); + } + } +} + +void TabletSchema::reserve_extracted_columns() { + for (auto it = _cols.begin(); it != _cols.end();) { + if (!it->is_extracted_column()) { + it = _cols.erase(it); + } else { + ++it; + } + } +} + void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const { for (const auto& i : _cluster_key_idxes) { tablet_schema_pb->add_cluster_key_idxes(i); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 072bebd95a..b0de391298 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -331,6 +331,12 @@ public: bool is_dropped_column(const TabletColumn& col) const; + // copy extracted columns from src_schema + void copy_extracted_columns(const TabletSchema& src_schema); + + // only reserve extracted columns + void reserve_extracted_columns(); + string get_all_field_names() const { string str = "["; for (auto p : _field_name_to_index) { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index e541c738be..987a210689 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -95,6 +96,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "runtime/types.h" +#include "service/backend_options.h" #include "service/point_query_executor.h" #include "util/arrow/row_batch.h" #include "util/async_io.h" @@ -113,6 +115,7 @@ #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/columns/column_string.h" +#include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type.h" @@ -852,6 +855,115 @@ void PInternalServiceImpl::_get_column_ids_by_tablet_ids( response->mutable_status()->set_status_code(TStatusCode::OK); } +template +struct AsyncRPCContext { + RPCResponse response; + brpc::Controller cntl; + brpc::CallId cid; +}; + +void PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); + if (request->is_coordinator()) { + // Spawn rpc request to none coordinator nodes, and finally merge them all + PFetchRemoteSchemaRequest remote_request(*request); + // set it none coordinator to get merged schema + remote_request.set_is_coordinator(false); + using PFetchRemoteTabletSchemaRpcContext = AsyncRPCContext; + std::vector rpc_contexts( + request->tablet_location_size()); + for (int i = 0; i < request->tablet_location_size(); ++i) { + std::string host = request->tablet_location(i).host(); + int32_t brpc_port = request->tablet_location(i).brpc_port(); + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + host, brpc_port)); + rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id(); + stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, &remote_request, + &rpc_contexts[i].response, brpc::DoNothing()); + } + std::vector schemas; + for (auto& rpc_context : rpc_contexts) { + brpc::Join(rpc_context.cid); + if (!st.ok()) { + // make sure all flying rpc request is joined + continue; + } + if (rpc_context.cntl.Failed()) { + LOG(WARNING) << "fetch_remote_tablet_schema rpc err:" + << rpc_context.cntl.ErrorText(); + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + rpc_context.cntl.remote_side()); + st = Status::InternalError("fetch_remote_tablet_schema rpc err: {}", + rpc_context.cntl.ErrorText()); + } + if (rpc_context.response.status().status_code() != 0) { + st = Status::create(rpc_context.response.status()); + } + if (rpc_context.response.has_merged_schema()) { + TabletSchemaSPtr schema = std::make_shared(); + schema->init_from_pb(rpc_context.response.merged_schema()); + schemas.push_back(schema); + } + } + if (!schemas.empty() && st.ok()) { + // merge all + TabletSchemaSPtr merged_schema; + static_cast(vectorized::schema_util::get_least_common_schema(schemas, nullptr, + merged_schema)); + VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure(); + merged_schema->reserve_extracted_columns(); + merged_schema->to_schema_pb(response->mutable_merged_schema()); + } + st.to_protobuf(response->mutable_status()); + return; + } else { + // This is not a coordinator, get it's tablet and merge schema + std::vector target_tablets; + for (int i = 0; i < request->tablet_location_size(); ++i) { + const auto& location = request->tablet_location(i); + auto backend = BackendOptions::get_local_backend(); + // If this is the target backend + if (backend.host == location.host() && config::brpc_port == location.brpc_port()) { + target_tablets.assign(location.tablet_id().begin(), location.tablet_id().end()); + break; + } + } + if (!target_tablets.empty()) { + std::vector tablet_schemas; + for (int64_t tablet_id : target_tablets) { + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, + false); + if (tablet == nullptr) { + // just ignore + LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id; + continue; + } + tablet_schemas.push_back(tablet->tablet_schema()); + } + if (!tablet_schemas.empty()) { + // merge all + TabletSchemaSPtr merged_schema; + static_cast(vectorized::schema_util::get_least_common_schema( + tablet_schemas, nullptr, merged_schema)); + merged_schema->to_schema_pb(response->mutable_merged_schema()); + VLOG_DEBUG << "dump schema:" << merged_schema->dump_structure(); + } + } + st.to_protobuf(response->mutable_status()); + } + }); + if (!ret) { + offer_failed(response, done, _heavy_work_pool); + } +} + void PInternalServiceImpl::report_stream_load_status(google::protobuf::RpcController* controller, const PReportStreamLoadStatusRequest* request, PReportStreamLoadStatusResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 5713faabab..15d121f2f1 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -204,6 +204,10 @@ public: const PGroupCommitInsertRequest* request, PGroupCommitInsertResponse* response, google::protobuf::Closure* done) override; + void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, + const PFetchRemoteSchemaRequest* request, + PFetchRemoteSchemaResponse* response, + google::protobuf::Closure* done) override; void get_wal_queue_size(google::protobuf::RpcController* controller, const PGetWalQueueSizeRequest* request, diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 871a81e402..030e228f52 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -701,6 +701,10 @@ Note that the comment must start with /*+ and can only follow the SELECT. Whether to enable partial columns update semantics for native insert into statement, default is false. Please note that the default value of the session variable `enable_insert_strict`, which controls whether the insert statement operates in strict mode, is true. In other words, the insert statement is in strict mode by default, and in this mode, updating non-existing keys in partial column updates is not allowed. Therefore, when using the insert statement for partial columns update and wishing to insert non-existing keys, you need to set `enable_unique_key_partial_update` to true and simultaneously set `enable_insert_strict` to false. +* `describe_extend_variant_column` + + Controls whether to extend variant column in desc table_name. The default value is false. + *** #### Supplementary instructions on statement execution timeout control diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 43daa14af8..2a697b01bf 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -689,6 +689,10 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/ 是否在对insert into语句启用部分列更新的语义,默认为 false。需要注意的是,控制insert语句是否开启严格模式的会话变量`enable_insert_strict`的默认值为true,即insert语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的key。所以,在使用insert语句进行部分列更新的时候如果希望能插入不存在的key,需要在`enable_unique_key_partial_update`设置为true的基础上同时将`enable_insert_strict`设置为false。 +* `describe_extend_variant_column` + + 是否展示 variant 的拆解列。默认为 false。 + *** #### 关于语句执行超时控制的补充说明 diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java index b9dc9260dd..981f5bb61e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java @@ -123,6 +123,7 @@ public abstract class Type { public static final ScalarType VARIANT = new ScalarType(PrimitiveType.VARIANT); public static final AnyType ANY_STRUCT_TYPE = new AnyStructType(); public static final AnyType ANY_ELEMENT_TYPE = new AnyElementType(); + private static final Map typeMap = new HashMap<>(); private static final Logger LOG = LogManager.getLogger(Type.class); private static final ArrayList integerTypes; @@ -135,6 +136,43 @@ public abstract class Type { private static final ArrayList structSubTypes; private static final ArrayList trivialTypes; + static { + typeMap.put("TINYINT", Type.TINYINT); + typeMap.put("SMALLINT", Type.SMALLINT); + typeMap.put("INT", Type.INT); + typeMap.put("BIGINT", Type.BIGINT); + typeMap.put("LARGEINT", Type.LARGEINT); + typeMap.put("UNSIGNED_TINYINT", Type.UNSUPPORTED); + typeMap.put("UNSIGNED_SMALLINT", Type.UNSUPPORTED); + typeMap.put("UNSIGNED_INT", Type.UNSUPPORTED); + typeMap.put("UNSIGNED_BIGINT", Type.UNSUPPORTED); + typeMap.put("FLOAT", Type.FLOAT); + typeMap.put("DISCRETE_DOUBLE", Type.DOUBLE); + typeMap.put("DOUBLE", Type.DOUBLE); + typeMap.put("CHAR", Type.CHAR); + typeMap.put("DATE", Type.DATE); + typeMap.put("DATEV2", Type.DATEV2); + typeMap.put("DATETIMEV2", Type.DATETIMEV2); + typeMap.put("DATETIME", Type.DATETIME); + typeMap.put("DECIMAL32", Type.DECIMAL32); + typeMap.put("DECIMAL64", Type.DECIMAL64); + typeMap.put("DECIMAL128I", Type.DECIMAL128); + typeMap.put("DECIMAL", Type.DECIMALV2); + typeMap.put("VARCHAR", Type.VARCHAR); + typeMap.put("STRING", Type.STRING); + typeMap.put("JSONB", Type.JSONB); + typeMap.put("VARIANT", Type.VARIANT); + typeMap.put("BOOLEAN", Type.BOOLEAN); + typeMap.put("HLL", Type.HLL); + typeMap.put("STRUCT", Type.STRUCT); + typeMap.put("LIST", Type.UNSUPPORTED); + typeMap.put("MAP", Type.MAP); + typeMap.put("OBJECT", Type.UNSUPPORTED); + typeMap.put("ARRAY", Type.ARRAY); + typeMap.put("QUANTILE_STATE", Type.QUANTILE_STATE); + typeMap.put("AGG_STATE", Type.AGG_STATE); + } + static { integerTypes = Lists.newArrayList(); integerTypes.add(TINYINT); @@ -2251,5 +2289,9 @@ public abstract class Type { } return false; } + + public static Type getTypeFromTypeName(String typeName) { + return typeMap.getOrDefault(typeName, Type.UNSUPPORTED); + } } diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 1be6d99e25..a81f8cf93c 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4345,9 +4345,9 @@ opt_explain_options ::= // Describe statement describe_stmt ::= - describe_command table_name:table + describe_command table_name:table opt_partition_names:partitionNames {: - RESULT = new DescribeStmt(table, false); + RESULT = new DescribeStmt(table, false, partitionNames); :} | KW_SHOW KW_FIELDS KW_FROM table_name:table {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java index f632c00f9c..3325a73687 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescribeStmt.java @@ -92,6 +92,7 @@ public class DescribeStmt extends ShowStmt { private TableName dbTableName; private ProcNodeInterface node; + private PartitionNames partitionNames; List> totalRows = new LinkedList>(); @@ -106,6 +107,12 @@ public class DescribeStmt extends ShowStmt { this.isAllTables = isAllTables; } + public DescribeStmt(TableName dbTableName, boolean isAllTables, PartitionNames partitionNames) { + this.dbTableName = dbTableName; + this.isAllTables = isAllTables; + this.partitionNames = partitionNames; + } + public DescribeStmt(TableValuedFunctionRef tableValuedFunctionRef) { this.tableValuedFunctionRef = tableValuedFunctionRef; this.isTableValuedFunction = true; @@ -156,6 +163,13 @@ public class DescribeStmt extends ShowStmt { return; } + if (partitionNames != null) { + partitionNames.analyze(analyzer); + if (partitionNames.isTemp()) { + throw new AnalysisException("Do not support temp partitions"); + } + } + dbTableName.analyze(analyzer); if (!Env.getCurrentEnv().getAccessManager() @@ -178,9 +192,22 @@ public class DescribeStmt extends ShowStmt { if (table.getType() == TableType.OLAP) { procString += ((OlapTable) table).getBaseIndexId(); } else { + if (partitionNames != null) { + throw new AnalysisException(dbTableName.getTbl() + + " is not a OLAP table, describe table failed"); + } procString += table.getId(); } - + if (partitionNames != null) { + procString += "/"; + StringBuilder builder = new StringBuilder(); + for (String str : partitionNames.getPartitionNames()) { + builder.append(str); + builder.append(","); + } + builder.deleteCharAt(builder.length() - 1); + procString += builder.toString(); + } node = ProcService.getInstance().open(procString); if (node == null) { throw new AnalysisException("Describe table[" + dbTableName.getTbl() + "] failed"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java index 1d6b862869..f86ea7855e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateType.java @@ -23,7 +23,9 @@ import com.google.common.collect.Lists; import java.util.EnumMap; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; public enum AggregateType { SUM("SUM"), @@ -39,6 +41,20 @@ public enum AggregateType { private static EnumMap> compatibilityMap; + private static final Map aggTypeMap = new HashMap<>(); + + static { + aggTypeMap.put("NONE", AggregateType.NONE); + aggTypeMap.put("SUM", AggregateType.SUM); + aggTypeMap.put("MIN", AggregateType.MIN); + aggTypeMap.put("MAX", AggregateType.MAX); + aggTypeMap.put("REPLACE", AggregateType.REPLACE); + aggTypeMap.put("REPLACE_IF_NOT_NULL", AggregateType.REPLACE_IF_NOT_NULL); + aggTypeMap.put("HLL_UNION", AggregateType.HLL_UNION); + aggTypeMap.put("BITMAP_UNION", AggregateType.BITMAP_UNION); + aggTypeMap.put("QUANTILE_UNION", AggregateType.QUANTILE_UNION); + } + static { compatibilityMap = new EnumMap<>(AggregateType.class); List primitiveTypeList = Lists.newArrayList(); @@ -181,4 +197,8 @@ public enum AggregateType { return null; } } + + public static AggregateType getAggTypeFromAggName(String typeName) { + return aggTypeMap.get(typeName); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2284164363..adfa20fe1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2423,4 +2423,14 @@ public class OlapTable extends Table { } return false; } + + public List getAllTablets() throws AnalysisException { + List tablets = Lists.newArrayList(); + for (Partition partition : getPartitions()) { + for (Tablet tablet : partition.getBaseIndex().getTablets()) { + tablets.add(tablet); + } + } + return tablets; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java index 4775ba23ef..de91b59dce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexInfoProcDir.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.SessionVariable; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -128,6 +129,10 @@ public class IndexInfoProcDir implements ProcDirInterface { throw new AnalysisException("Index " + idxId + " does not exist"); } bfColumns = olapTable.getCopiedBfColumns(); + if (olapTable.hasVariantColumns() + && SessionVariable.enableDescribeExtendVariantColumn()) { + return new RemoteIndexSchemaProcDir(table, schema, bfColumns); + } } else { schema = table.getBaseSchema(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java index 47da7a9d53..6f125217ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IndexSchemaProcNode.java @@ -49,10 +49,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface { this.bfColumns = bfColumns; } - @Override - public ProcResult fetchResult() throws AnalysisException { + public static ProcResult createResult(List schema, Set bfColumns) throws AnalysisException { Preconditions.checkNotNull(schema); - BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); @@ -105,4 +103,8 @@ public class IndexSchemaProcNode implements ProcNodeInterface { return result; } + @Override + public ProcResult fetchResult() throws AnalysisException { + return createResult(this.schema, this.bfColumns); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java new file mode 100644 index 0000000000..7852bd6b1c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcDir.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +/* + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId" + * show index schema + */ +public class RemoteIndexSchemaProcDir implements ProcDirInterface { + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() + .add("Field").add("Type").add("Null").add("Key") + .add("Default").add("Extra") + .build(); + + private List schema; + private Set bfColumns; + private TableIf table; + + public RemoteIndexSchemaProcDir(TableIf table, List schema, Set bfColumns) { + this.table = table; + this.schema = schema; + this.bfColumns = bfColumns; + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(schema); + List tablets = null; + table.readLock(); + try { + OlapTable olapTable = (OlapTable) table; + tablets = olapTable.getAllTablets(); + } finally { + table.readUnlock(); + } + List remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); + if (remoteSchema == null || remoteSchema.isEmpty()) { + throw new AnalysisException("fetch remote tablet schema failed"); + } + this.schema.addAll(remoteSchema); + return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); + } + + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String partitionString) throws AnalysisException { + Preconditions.checkNotNull(table); + + List partitionNameList = new ArrayList(Arrays.asList(partitionString.split(","))); + if (partitionNameList == null || partitionNameList.isEmpty()) { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } + List partitions = Lists.newArrayList(); + table.readLock(); + try { + if (table.getType() == TableType.OLAP) { + OlapTable olapTable = (OlapTable) table; + for (String partitionName : partitionNameList) { + Partition partition = olapTable.getPartition(partitionName); + if (partition == null) { + throw new AnalysisException("Partition " + partitionName + " does not exist"); + } + partitions.add(partition); + } + } else { + throw new AnalysisException(table.getName() + " is not a OLAP table, describe table failed"); + } + } catch (Throwable t) { + throw new AnalysisException("Describe table[" + table.getName() + "] failed"); + } finally { + table.readUnlock(); + } + return new RemoteIndexSchemaProcNode(partitions, this.schema, this.bfColumns); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java new file mode 100644 index 0000000000..d5b3d46322 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RemoteIndexSchemaProcNode.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.FetchRemoteTabletSchemaUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Set; + +/* + * SHOW PROC /dbs/dbId/tableId/index_schema/indexId/partitionName" + * show index schema + */ +public class RemoteIndexSchemaProcNode implements ProcNodeInterface { + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() + .add("Field").add("Type").add("Null").add("Key") + .add("Default").add("Extra") + .build(); + + private List partitions; + private List schema; + private Set bfColumns; + + public RemoteIndexSchemaProcNode(List partitions, List schema, Set bfColumns) { + this.partitions = partitions; + this.schema = schema; + this.bfColumns = bfColumns; + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + Preconditions.checkNotNull(schema); + Preconditions.checkNotNull(partitions); + List tablets = Lists.newArrayList(); + for (Partition partition : partitions) { + MaterializedIndex idx = partition.getBaseIndex(); + for (Tablet tablet : idx.getTablets()) { + tablets.add(tablet); + } + } + List remoteSchema = new FetchRemoteTabletSchemaUtil(tablets).fetch(); + if (remoteSchema == null || remoteSchema.isEmpty()) { + throw new AnalysisException("fetch remote tablet schema failed"); + } + this.schema.addAll(remoteSchema); + return IndexSchemaProcNode.createResult(this.schema, this.bfColumns); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java new file mode 100644 index 0000000000..808b4b2a55 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FetchRemoteTabletSchemaUtil.java @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaRequest; +import org.apache.doris.proto.InternalService.PFetchRemoteSchemaResponse; +import org.apache.doris.proto.InternalService.PTabletsLocation; +import org.apache.doris.proto.OlapFile.ColumnPB; +import org.apache.doris.proto.OlapFile.TabletSchemaPB; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +// This class is used to pull the specified tablets' columns existing on the Backend (BE) +// including regular columns and columns decomposed by variants +public class FetchRemoteTabletSchemaUtil { + private static final Logger LOG = LogManager.getLogger(FetchRemoteTabletSchemaUtil.class); + + private List remoteTablets; + private List tableColumns; + + public FetchRemoteTabletSchemaUtil(List tablets) { + this.remoteTablets = tablets; + this.tableColumns = Lists.newArrayList(); + } + + public List fetch() { + // 1. Find which Backend (BE) servers the tablets are on + Preconditions.checkNotNull(remoteTablets); + Map> beIdToTabletId = Maps.newHashMap(); + for (Tablet tablet : remoteTablets) { + for (Replica replica : tablet.getReplicas()) { + // only need alive replica + if (replica.isAlive()) { + Set tabletIds = beIdToTabletId.computeIfAbsent( + replica.getBackendId(), k -> Sets.newHashSet()); + tabletIds.add(tablet.getId()); + } + } + } + + // 2. Randomly select 2 Backend (BE) servers to act as coordinators. + // Coordinator BE is responsible for collecting all table columns and returning to the FE. + // Two BE provide a retry opportunity with the second one in case the first attempt fails. + List locations = Lists.newArrayList(); + List coordinatorBackend = Lists.newArrayList(); + for (Map.Entry> entry : beIdToTabletId.entrySet()) { + Long backendId = entry.getKey(); + Set tabletIds = entry.getValue(); + Backend backend = Env.getCurrentEnv().getCurrentSystemInfo().getBackend(backendId); + // only need alive be + if (!backend.isAlive()) { + continue; + } + // need 2 be to provide a retry + if (coordinatorBackend.size() < 2) { + coordinatorBackend.add(backend); + } + PTabletsLocation.Builder locationBuilder = PTabletsLocation.newBuilder() + .setHost(backend.getHost()) + .setBrpcPort(backend.getBrpcPort()); + PTabletsLocation location = locationBuilder.addAllTabletId(tabletIds).build(); + locations.add(location); + } + PFetchRemoteSchemaRequest.Builder requestBuilder = PFetchRemoteSchemaRequest.newBuilder() + .addAllTabletLocation(locations) + .setIsCoordinator(true); + // 3. Send rpc to coordinatorBackend util succeed or retry + for (Backend be : coordinatorBackend) { + try { + PFetchRemoteSchemaRequest request = requestBuilder.build(); + Future future = BackendServiceProxy.getInstance() + .fetchRemoteTabletSchemaAsync(be.getBrpcAddress(), request); + PFetchRemoteSchemaResponse response = null; + try { + response = future.get(60, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!response.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = response.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "fetchRemoteTabletSchemaAsync failed. backend address: " + + be.getHost() + " : " + be.getBrpcPort(); + } + throw new RpcException(be.getHost(), errMsg); + } + fillColumns(response); + return tableColumns; + } catch (AnalysisException e) { + // continue to get result + LOG.warn(e); + } catch (InterruptedException e) { + // continue to get result + LOG.warn("fetch remote schema future get interrupted Exception"); + } catch (TimeoutException e) { + future.cancel(true); + // continue to get result + LOG.warn("fetch remote schema result timeout, addr {}", be.getBrpcAddress()); + } + } catch (RpcException e) { + LOG.warn("fetch remote schema result rpc exception {}, e {}", be.getBrpcAddress(), e); + } catch (ExecutionException e) { + LOG.warn("fetch remote schema ExecutionException, addr {}, e {}", be.getBrpcAddress(), e); + } + } + return tableColumns; + } + + private void fillColumns(PFetchRemoteSchemaResponse response) throws AnalysisException { + TabletSchemaPB schemaPB = response.getMergedSchema(); + for (ColumnPB columnPB : schemaPB.getColumnList()) { + try { + Column remoteColumn = initColumnFromPB(columnPB); + tableColumns.add(remoteColumn); + } catch (Exception e) { + throw new AnalysisException("column default value to string failed"); + } + } + // sort the columns + Collections.sort(tableColumns, new Comparator() { + @Override + public int compare(Column c1, Column c2) { + return c1.getName().compareTo(c2.getName()); + } + }); + } + + private Column initColumnFromPB(ColumnPB column) throws AnalysisException { + try { + AggregateType aggType = AggregateType.getAggTypeFromAggName(column.getAggregation()); + Type type = Type.getTypeFromTypeName(column.getType()); + String columnName = column.getName(); + boolean isKey = column.getIsKey(); + boolean isNullable = column.getIsNullable(); + String defaultValue = column.getDefaultValue().toString("UTF-8"); + if (defaultValue.equals("")) { + defaultValue = null; + } + if (isKey) { + aggType = null; + } + do { + if (type.isArrayType()) { + List childColumn = column.getChildrenColumnsList(); + if (childColumn == null || childColumn.size() != 1) { + break; + } + Column child = initColumnFromPB(childColumn.get(0)); + type = new ArrayType(child.getType()); + } else if (type.isMapType()) { + List childColumn = column.getChildrenColumnsList(); + if (childColumn == null || childColumn.size() != 2) { + break; + } + Column keyChild = initColumnFromPB(childColumn.get(0)); + Column valueChild = initColumnFromPB(childColumn.get(1)); + type = new MapType(keyChild.getType(), valueChild.getType()); + } else if (type.isStructType()) { + List childColumn = column.getChildrenColumnsList(); + if (childColumn == null) { + break; + } + List childTypes = Lists.newArrayList(); + for (ColumnPB childPB : childColumn) { + childTypes.add(initColumnFromPB(childPB).getType()); + } + type = new StructType(childTypes); + } + } while (false); + return new Column(columnName, type, isKey, aggType, isNullable, + defaultValue, "remote schema"); + } catch (Exception e) { + throw new AnalysisException("default value to string failed"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ebb5741671..33603dd78a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -474,6 +474,8 @@ public class SessionVariable implements Serializable, Writable { public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = "wait_full_block_schedule_times"; + public static final String DESCRIBE_EXTEND_VARIANT_COLUMN = "describe_extend_variant_column"; + public static final List DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -857,6 +859,9 @@ public class SessionVariable implements Serializable, Writable { return beNumberForTest; } + @VariableMgr.VarAttr(name = DESCRIBE_EXTEND_VARIANT_COLUMN, needForward = true) + public boolean enableDescribeExtendVariantColumn = false; + @VariableMgr.VarAttr(name = PROFILLING) public boolean profiling = false; @@ -3057,6 +3062,22 @@ public class SessionVariable implements Serializable, Writable { } } + public boolean getEnableDescribeExtendVariantColumn() { + return enableDescribeExtendVariantColumn; + } + + public void setEnableDescribeExtendVariantColumn(boolean enableDescribeExtendVariantColumn) { + this.enableDescribeExtendVariantColumn = enableDescribeExtendVariantColumn; + } + + public static boolean enableDescribeExtendVariantColumn() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return false; + } + return connectContext.getSessionVariable().enableDescribeExtendVariantColumn; + } + public int getProfileLevel() { return this.profileLevel; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 9535d075e2..f3f4440e03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -152,6 +152,11 @@ public class BackendServiceClient { return stub.reportStreamLoadStatus(request); } + public Future fetchRemoteTabletSchemaAsync( + InternalService.PFetchRemoteSchemaRequest request) { + return stub.fetchRemoteTabletSchema(request); + } + public Future glob(InternalService.PGlobRequest request) { return stub.glob(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 80767acecf..02245c83ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -466,5 +466,17 @@ public class BackendServiceProxy { } } + public Future fetchRemoteTabletSchemaAsync( + TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.fetchRemoteTabletSchemaAsync(request); + } catch (Throwable e) { + LOG.warn("fetch remote tablet schema catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ad66efc62a..3676d854a9 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -803,6 +803,23 @@ message PGetWalQueueSizeResponse{ optional int64 size = 2; } +message PTabletsLocation { + required string host = 1; + required int32 brpc_port = 2; + repeated int64 tablet_id = 3; +} + +message PFetchRemoteSchemaRequest { + repeated PTabletsLocation tablet_location = 1; + required bool is_coordinator = 2; +} + +message PFetchRemoteSchemaResponse { + optional PStatus status = 1; + // intermediate merged schema + optional TabletSchemaPB merged_schema = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -846,5 +863,6 @@ service PBackendService { rpc group_commit_insert(PGroupCommitInsertRequest) returns (PGroupCommitInsertResponse); rpc get_wal_queue_size(PGetWalQueueSizeRequest) returns(PGetWalQueueSizeResponse); rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); + rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); }; diff --git a/regression-test/data/variant_p0/desc.out b/regression-test/data/variant_p0/desc.out new file mode 100644 index 0000000000..f6f3d91048 --- /dev/null +++ b/regression-test/data/variant_p0/desc.out @@ -0,0 +1,196 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.ddd.aaa TINYINT Yes false \N +v.ddd.mxmxm JSON Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_6_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.ddd.aaa TINYINT Yes false \N +v.ddd.mxmxm JSON Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_6_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_6_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N + +-- !sql_6 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.ddd.aaa TINYINT Yes false \N +v.ddd.mxmxm JSON Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_7 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_7_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_7_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N + +-- !sql_7_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.xxxx TEXT Yes false \N + +-- !sql_8 -- +k BIGINT Yes true \N +v1 VARIANT Yes false \N NONE +v2 VARIANT Yes false \N NONE +v3 VARIANT Yes false \N NONE +v1.a SMALLINT Yes false \N +v1.b JSON Yes false \N +v1.c.c SMALLINT Yes false \N +v1.c.e DOUBLE Yes false \N +v1.oooo.xxxx.xxx TINYINT Yes false \N +v2.a SMALLINT Yes false \N +v2.xxxx TEXT Yes false \N +v3.a SMALLINT Yes false \N +v3.b JSON Yes false \N +v3.c.c SMALLINT Yes false \N +v3.c.e DOUBLE Yes false \N + +-- !sql_9 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE + +-- !sql_9_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_10 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N + +-- !sql_10_1 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v2 VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N +v2.a SMALLINT Yes false \N +v2.b JSON Yes false \N +v2.c.c SMALLINT Yes false \N +v2.c.e DOUBLE Yes false \N +v2.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_10_2 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N +v2.a SMALLINT Yes false \N +v2.b JSON Yes false \N +v2.c.c SMALLINT Yes false \N +v2.c.e DOUBLE Yes false \N +v2.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_10_3 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v3 VARIANT Yes false \N NONE +v.a SMALLINT Yes false \N +v.b JSON Yes false \N +v.c.c SMALLINT Yes false \N +v.c.e DOUBLE Yes false \N +v.k1 TINYINT Yes false \N +v.k2 TEXT Yes false \N +v.k3 ARRAY Yes false [] +v.k4 DOUBLE Yes false \N +v.k5 JSON Yes false \N +v.oooo.xxxx.xxx TINYINT Yes false \N +v3.a SMALLINT Yes false \N +v3.b JSON Yes false \N +v3.c.c SMALLINT Yes false \N +v3.c.e DOUBLE Yes false \N +v3.oooo.xxxx.xxx TINYINT Yes false \N + +-- !sql_11 -- +k BIGINT Yes true \N +v VARIANT Yes false \N NONE +v.!@#^&*() TEXT Yes false \N +v.名字 TEXT Yes false \N +v.画像.丬文 TEXT Yes false \N +v.画像.地址 TEXT Yes false \N +v.金额 SMALLINT Yes false \N + diff --git a/regression-test/suites/variant_p0/desc.groovy b/regression-test/suites/variant_p0/desc.groovy new file mode 100644 index 0000000000..fc9332acc5 --- /dev/null +++ b/regression-test/suites/variant_p0/desc.groovy @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("regression_test_variant_desc", "nonConcurrent"){ + + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def create_table = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def create_table_partition = { table_name, buckets="auto" -> + sql "DROP TABLE IF EXISTS ${table_name}" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + PARTITION BY RANGE(k) + ( + PARTITION p1 VALUES LESS THAN (3000), + PARTITION p2 VALUES LESS THAN (50000), + PARTITION p3 VALUES LESS THAN (100000) + ) + DISTRIBUTED BY HASH(k) BUCKETS ${buckets} + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + } + + def set_be_config = { key, value -> + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + + try { + // sparse columns + def table_name = "sparse_columns" + create_table table_name + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + sql """set describe_extend_variant_column = true""" + sql """insert into sparse_columns select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_1 """desc ${table_name}""" + sql "truncate table sparse_columns" + sql """insert into sparse_columns select 0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 0, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_2 """desc ${table_name}""" + sql "truncate table sparse_columns" + + // no sparse columns + table_name = "no_sparse_columns" + create_table.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1") + sql """insert into ${table_name} select 0, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + qt_sql_3 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // partititon + table_name = "partition_data" + create_table_partition.call(table_name, "4") + sql "set enable_two_phase_read_opt = false;" + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + sql """insert into ${table_name} select 2500, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}' as json_str + union all select 2500, '{"a" : 1234, "xxxx" : "kaana", "ddd" : {"aaa" : 123, "mxmxm" : [456, "789"]}}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql """insert into ${table_name} select 45000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}' as json_str + union all select 45000, '{"a": 1123}' as json_str union all select 45000, '{"a" : 1234, "xxxx" : "kaana"}' as json_str from numbers("number" = "4096") limit 4096 ;""" + sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + qt_sql_6_1 """desc ${table_name} partition p1""" + qt_sql_6_2 """desc ${table_name} partition p2""" + qt_sql_6_3 """desc ${table_name} partition p3""" + qt_sql_6 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // drop partition + table_name = "drop_partition" + create_table_partition.call(table_name, "4") + // insert into partition p1 + sql """insert into ${table_name} values(2500, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // insert into partition p2 + sql """insert into ${table_name} values(45000, '{"a": 11245, "xxxx" : "kaana"}')""" + // insert into partition p3 + sql """insert into ${table_name} values(95000, '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + // drop p1 + sql """alter table ${table_name} drop partition p1""" + qt_sql_7 """desc ${table_name}""" + qt_sql_7_1 """desc ${table_name} partition p2""" + qt_sql_7_2 """desc ${table_name} partition p3""" + qt_sql_7_3 """desc ${table_name} partition (p2, p3)""" + sql "truncate table ${table_name}" + + // more variant + table_name = "more_variant_table" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v1 variant, + v2 variant, + v3 variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', '{"a": 11245, "xxxx" : "kaana"}', '{"a": 11245, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}}')""" + qt_sql_8 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // describe_extend_variant_column = false + sql """set describe_extend_variant_column = false""" + table_name = "no_extend_variant_column" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + qt_sql_9 """desc ${table_name}""" + sql """set describe_extend_variant_column = true""" + qt_sql_9_1 """desc ${table_name}""" + sql "truncate table ${table_name}" + + // schema change: add varaint + table_name = "schema_change_table" + create_table.call(table_name, "5") + // add, drop columns + sql """INSERT INTO ${table_name} values(0, '{"k1":1, "k2": "hello world", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]]}')""" + sql """set describe_extend_variant_column = true""" + qt_sql_10 """desc ${table_name}""" + // add column + sql "alter table ${table_name} add column v2 variant default null" + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', + '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + qt_sql_10_1 """desc ${table_name}""" + // drop cloumn + sql "alter table ${table_name} drop column v2" + qt_sql_10_2 """desc ${table_name}""" + // add column + sql "alter table ${table_name} add column v3 variant default null" + sql """ insert into ${table_name} values (0, '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}', + '{"a": 1123, "b" : [123, {"xx" : 1}], "c" : {"c" : 456, "d" : null, "e" : 7.111}, "zzz" : null, "oooo" : {"akakaka" : null, "xxxx" : {"xxx" : 123}}}')""" + qt_sql_10_3 """desc ${table_name}""" + //sql "truncate table ${table_name}" + + // varaint column name: chinese name, unicode + table_name = "chinese_table" + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + k bigint, + v variant + ) + DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 5 + properties("replication_num" = "1", "disable_auto_compaction" = "false"); + """ + sql """ insert into ${table_name} values (0, '{"名字" : "jack", "!@#^&*()": "11111", "金额" : 200, "画像" : {"地址" : "北京", "\\\u4E2C\\\u6587": "unicode"}}')""" + sql """set describe_extend_variant_column = true""" + qt_sql_11 """desc ${table_name}""" + } finally { + // reset flags + set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95") + } +}