[bugfix](paimon)support native and jni to read paimon for minio/cos #29933

This commit is contained in:
wuwenchi
2024-01-16 17:18:10 +08:00
committed by yiguolei
parent 4bf4239d7a
commit 74991c4af2
8 changed files with 419 additions and 271 deletions

View File

@ -33,7 +33,7 @@ under the License.
## Instructions for use
1. When data in hdfs,need to put core-site.xml, hdfs-site.xml and hive-site.xml in the conf directory of FE and BE. First read the hadoop configuration file in the conf directory, and then read the related to the environment variable `HADOOP_CONF_DIR` configuration file.
2. The currently adapted version of the payment is 0.5.0
2. The currently adapted version of the payment is 0.6.0
## Create Catalog
@ -73,11 +73,11 @@ CREATE CATALOG `paimon_kerberos` PROPERTIES (
);
```
#### S3
#### MINIO
> Note that.
>
> user need download [paimon-s3-0.5.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/0.5.0-incubating/paimon-s3-0.5.0-incubating.jar)
> user need download [paimon-s3-0.6.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/0.6.0-incubating/paimon-s3-0.6.0-incubating.jar)
>
> Place it in directory `${DORIS_HOME}/be/lib/java_extensions/preload-extensions` and restart be
>
@ -86,18 +86,38 @@ CREATE CATALOG `paimon_kerberos` PROPERTIES (
```sql
CREATE CATALOG `paimon_s3` PROPERTIES (
"type" = "paimon",
"warehouse" = "s3://paimon-1308700295.cos.ap-beijing.myqcloud.com/paimoncos",
"s3.endpoint" = "cos.ap-beijing.myqcloud.com",
"warehouse" = "s3://bucket_name/paimons3",
"s3.endpoint" = "http://<ip>:<port>",
"s3.access_key" = "ak",
"s3.secret_key" = "sk"
);
```
#### COS
> Note that.
>
> user need download [paimon-s3-0.6.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/0.6.0-incubating/paimon-s3-0.6.0-incubating.jar)
>
> Place it in directory `${DORIS_HOME}/be/lib/java_extensions/preload-extensions` and restart be
>
> Starting from version 2.0.2, this file can be placed in BE's `custom_lib/` directory (if it does not exist, just create it manually) to prevent the file from being lost due to the replacement of the lib directory when upgrading the cluster.
```sql
CREATE CATALOG `paimon_cos` PROPERTIES (
"type" = "paimon",
"warehouse" = "cosn://paimon-1308700295/paimoncos",
"cos.endpoint" = "cos.ap-beijing.myqcloud.com",
"cos.access_key" = "ak",
"cos.secret_key" = "sk"
);
```
#### OSS
>Note that.
>
> user need download [paimon-oss-0.5.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.5.0-incubating/paimon-oss-0.5.0-incubating.jar)
> user need download [paimon-oss-0.6.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.6.0-incubating/paimon-oss-0.6.0-incubating.jar)
>
> Place it in directory `${DORIS_HOME}/be/lib/java_extensions/preload-extensions` and restart be

View File

@ -33,7 +33,7 @@ under the License.
## 使用须知
1. 数据放在hdfs时,需要将 core-site.xml,hdfs-site.xml 和 hive-site.xml 放到 FE 和 BE 的 conf 目录下。优先读取 conf 目录下的 hadoop 配置文件,再读取环境变量 `HADOOP_CONF_DIR` 的相关配置文件。
2. 当前适配的paimon版本为0.5.0
2. 当前适配的paimon版本为0.6.0
## 创建 Catalog
@ -73,11 +73,11 @@ CREATE CATALOG `paimon_kerberos` PROPERTIES (
);
```
#### S3
#### MINIO
> 注意:
>
> 用户需要手动下载[paimon-s3-0.5.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/0.5.0-incubating/paimon-s3-0.5.0-incubating.jar)
> 用户需要手动下载[paimon-s3-0.6.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/0.6.0-incubating/paimon-s3-0.6.0-incubating.jar)
> 放在 `${DORIS_HOME}/be/lib/java_extensions/preload-extensions` 目录下并重启be。
>
@ -86,18 +86,38 @@ CREATE CATALOG `paimon_kerberos` PROPERTIES (
```sql
CREATE CATALOG `paimon_s3` PROPERTIES (
"type" = "paimon",
"warehouse" = "s3://paimon-1308700295.cos.ap-beijing.myqcloud.com/paimoncos",
"s3.endpoint" = "cos.ap-beijing.myqcloud.com",
"warehouse" = "s3://bucket_name/paimons3",
"s3.endpoint" = "http://<ip>:<port>",
"s3.access_key" = "ak",
"s3.secret_key" = "sk"
);
```
#### COS
> 注意:
>
> 用户需要手动下载[paimon-s3-0.6.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/0.6.0-incubating/paimon-s3-0.6.0-incubating.jar)
> 放在 `${DORIS_HOME}/be/lib/java_extensions/preload-extensions` 目录下并重启be。
>
> 从 2.0.2 版本起,可以将这个文件放置在BE的 `custom_lib/` 目录下(如不存在,手动创建即可),以防止升级集群时因为 lib 目录被替换而导致文件丢失。
```sql
CREATE CATALOG `paimon_s3` PROPERTIES (
"type" = "paimon",
"warehouse" = "cosn://paimon-1308700295/paimoncos",
"cos.endpoint" = "cos.ap-beijing.myqcloud.com",
"cos.access_key" = "ak",
"cos.secret_key" = "sk"
);
```
#### OSS
>注意:
>
> 用户需要手动下载[paimon-oss-0.5.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.5.0-incubating/paimon-oss-0.5.0-incubating.jar)
> 用户需要手动下载[paimon-oss-0.6.0-incubating.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/0.6.0-incubating/paimon-oss-0.6.0-incubating.jar)
> 放在 `${DORIS_HOME}/be/lib/java_extensions/preload-extensions` 目录下并重启be
```sql

View File

@ -18,6 +18,8 @@
package org.apache.doris.datasource.paimon;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.logging.log4j.LogManager;
@ -57,6 +59,19 @@ public class PaimonFileExternalCatalog extends PaimonExternalCatalog {
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
} else if (properties.containsKey(CosProperties.ENDPOINT)) {
options.put(PaimonProperties.PAIMON_S3_ENDPOINT,
properties.get(CosProperties.ENDPOINT));
options.put(PaimonProperties.PAIMON_S3_ACCESS_KEY,
properties.get(CosProperties.ACCESS_KEY));
options.put(PaimonProperties.PAIMON_S3_SECRET_KEY,
properties.get(CosProperties.SECRET_KEY));
options.put(PaimonProperties.WAREHOUSE,
options.get(PaimonProperties.WAREHOUSE).replace("cosn://", "s3://"));
}
if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
options.put(PaimonProperties.S3_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE));
}
}
}

View File

@ -25,6 +25,7 @@ import java.util.Map;
public class PaimonProperties {
public static final String WAREHOUSE = "warehouse";
public static final String S3_PATH_STYLE = "s3.path.style.access";
public static final String FILE_FORMAT = "file.format";
public static final String PAIMON_PREFIX = "paimon";
public static final String PAIMON_CATALOG_TYPE = "metastore";

View File

@ -31,6 +31,7 @@ import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileAttributes;
@ -43,6 +44,8 @@ import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.source.DataSplit;
@ -53,6 +56,7 @@ import org.apache.paimon.utils.InstantiationUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -60,6 +64,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class PaimonScanNode extends FileQueryScanNode {
private static final Logger LOG = LogManager.getLogger(PaimonScanNode.class);
private PaimonSource source = null;
private List<Predicate> predicates;
@ -110,6 +115,7 @@ public class PaimonScanNode extends FileQueryScanNode {
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
if (split != null) {
// use jni reader
fileDesc.setPaimonSplit(encodeObjectToString(split));
}
fileDesc.setFileFormat(source.getFileFormat());
@ -129,6 +135,7 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public List<Split> getSplits() throws UserException {
boolean forceJniScanner = ConnectContext.get().getSessionVariable().isForceJniScanner();
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
@ -139,7 +146,7 @@ public class PaimonScanNode extends FileQueryScanNode {
.newScan().plan().splits();
boolean supportNative = supportNativeReader();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
if (supportNative && split instanceof DataSplit) {
if (!forceJniScanner && supportNative && split instanceof DataSplit) {
DataSplit dataSplit = (DataSplit) split;
Optional<List<RawFile>> optRowFiles = dataSplit.convertToRawFiles();
if (optRowFiles.isPresent()) {
@ -234,7 +241,13 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
return source.getCatalog().getCatalogProperty().getHadoopProperties();
HashMap<String, String> map = new HashMap<>(source.getCatalog().getProperties());
source.getCatalog().getCatalogProperty().getHadoopProperties().forEach((k, v) -> {
if (!map.containsKey(k)) {
map.put(k, v);
}
});
return map;
}
}

View File

@ -107,7 +107,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String SQL_SAFE_UPDATES = "sql_safe_updates";
public static final String NET_BUFFER_LENGTH = "net_buffer_length";
public static final String CODEGEN_LEVEL = "codegen_level";
public static final String HAVE_QUERY_CACHE = "have_query_cache";
public static final String HAVE_QUERY_CACHE = "have_query_cache";
// mem limit can't smaller than bufferpool's default page size
public static final int MIN_EXEC_MEM_LIMIT = 2097152;
public static final String BATCH_SIZE = "batch_size";
@ -135,7 +135,8 @@ public class SessionVariable implements Serializable, Writable {
// if set to true, some of stmt will be forwarded to master FE to get result
public static final String FORWARD_TO_MASTER = "forward_to_master";
// user can set instance num after exchange, no need to be equal to nums of before exchange
// user can set instance num after exchange, no need to be equal to nums of
// before exchange
public static final String PARALLEL_EXCHANGE_INSTANCE_NUM = "parallel_exchange_instance_num";
public static final String SHOW_HIDDEN_COLUMNS = "show_hidden_columns";
public static final String USE_V2_ROLLUP = "use_v2_rollup";
@ -146,12 +147,13 @@ public class SessionVariable implements Serializable, Writable {
public static final String DEFAULT_STORAGE_ENGINE = "default_storage_engine";
public static final String DEFAULT_TMP_STORAGE_ENGINE = "default_tmp_storage_engine";
// Compatible with mysql
// Compatible with mysql
public static final String PROFILLING = "profiling";
public static final String DIV_PRECISION_INCREMENT = "div_precision_increment";
// see comment of `doris_max_scan_key_num` and `max_pushdown_conditions_per_column` in BE config
// see comment of `doris_max_scan_key_num` and
// `max_pushdown_conditions_per_column` in BE config
public static final String MAX_SCAN_KEY_NUM = "max_scan_key_num";
public static final String MAX_PUSHDOWN_CONDITIONS_PER_COLUMN = "max_pushdown_conditions_per_column";
@ -160,7 +162,8 @@ public class SessionVariable implements Serializable, Writable {
// runtime filter run mode
public static final String RUNTIME_FILTER_MODE = "runtime_filter_mode";
// Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
// Size in bytes of Bloom Filters used for runtime filters. Actual size of
// filter will
// be rounded up to the nearest power of two.
public static final String RUNTIME_BLOOM_FILTER_SIZE = "runtime_bloom_filter_size";
// Minimum runtime bloom filter size, in bytes
@ -176,7 +179,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String RUNTIME_FILTERS_MAX_NUM = "runtime_filters_max_num";
// Runtime filter type used, For testing, Corresponds to TRuntimeFilterType
public static final String RUNTIME_FILTER_TYPE = "runtime_filter_type";
// if the right table is greater than this value in the hash join, we will ignore IN filter
// if the right table is greater than this value in the hash join, we will
// ignore IN filter
public static final String RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num";
public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
@ -187,8 +191,10 @@ public class SessionVariable implements Serializable, Writable {
public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition";
// set the default parallelism for send batch when execute InsertStmt operation,
// if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config,
// then the coordinator be will use the value of `max_send_batch_parallelism_per_job`
// if the value for parallelism exceed `max_send_batch_parallelism_per_job` in
// BE config,
// then the coordinator be will use the value of
// `max_send_batch_parallelism_per_job`
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
// turn off all automatic join reorder algorithms
@ -229,7 +235,8 @@ public class SessionVariable implements Serializable, Writable {
// Limit the max count of scanners to prevent generate too many scanners.
public static final String PARALLEL_SCAN_MAX_SCANNERS_COUNT = "parallel_scan_max_scanners_count";
// Avoid splitting small segments, each scanner should scan `parallel_scan_min_rows_per_scanner` rows.
// Avoid splitting small segments, each scanner should scan
// `parallel_scan_min_rows_per_scanner` rows.
public static final String PARALLEL_SCAN_MIN_ROWS_PER_SCANNER = "parallel_scan_min_rows_per_scanner";
public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
@ -258,8 +265,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String DECIMAL_OVERFLOW_SCALE = "decimal_overflow_scale";
public static final String TRIM_TAILING_SPACES_FOR_EXTERNAL_TABLE_QUERY
= "trim_tailing_spaces_for_external_table_query";
public static final String TRIM_TAILING_SPACES_FOR_EXTERNAL_TABLE_QUERY = "trim_tailing_spaces_for_external_table_query";
public static final String ENABLE_DPHYP_OPTIMIZER = "enable_dphyp_optimizer";
@ -290,8 +296,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_FOLD_NONDETERMINISTIC_FN = "enable_fold_nondeterministic_fn";
public static final String ENABLE_RUNTIME_FILTER_PRUNE =
"enable_runtime_filter_prune";
public static final String ENABLE_RUNTIME_FILTER_PRUNE = "enable_runtime_filter_prune";
static final String SESSION_CONTEXT = "session_context";
@ -334,8 +339,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String PARTITION_PRUNING_EXPAND_THRESHOLD = "partition_pruning_expand_threshold";
public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
= "enable_share_hash_table_for_broadcast_join";
public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN = "enable_share_hash_table_for_broadcast_join";
// Optimize when probe side has no data for some hash join types
public static final String ENABLE_HASH_JOIN_EARLY_START_PROBE = "enable_hash_join_early_start_probe";
@ -383,12 +387,14 @@ public class SessionVariable implements Serializable, Writable {
public static final String DUMP_NEREIDS_MEMO = "dump_nereids_memo";
// fix replica to query. If num = 1, query the smallest replica, if 2 is the second smallest replica.
// fix replica to query. If num = 1, query the smallest replica, if 2 is the
// second smallest replica.
public static final String USE_FIX_REPLICA = "use_fix_replica";
public static final String DRY_RUN_QUERY = "dry_run_query";
// Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
// Split size for ExternalFileScanNode. Default value 0 means use the block size
// of HDFS/S3.
public static final String FILE_SPLIT_SIZE = "file_split_size";
/**
@ -429,8 +435,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String JDBC_CLICKHOUSE_QUERY_FINAL = "jdbc_clickhouse_query_final";
public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
"enable_memtable_on_sink_node";
public static final String ENABLE_MEMTABLE_ON_SINK_NODE = "enable_memtable_on_sink_node";
public static final String LOAD_STREAM_PER_NODE = "load_stream_per_node";
@ -464,26 +469,22 @@ public class SessionVariable implements Serializable, Writable {
public static final String HUGE_TABLE_DEFAULT_SAMPLE_ROWS = "huge_table_default_sample_rows";
public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = "huge_table_lower_bound_size_in_bytes";
public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
= "huge_table_auto_analyze_interval_in_millis";
public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = "huge_table_auto_analyze_interval_in_millis";
public static final String EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
= "external_table_auto_analyze_interval_in_millis";
public static final String EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = "external_table_auto_analyze_interval_in_millis";
public static final String TABLE_STATS_HEALTH_THRESHOLD
= "table_stats_health_threshold";
public static final String TABLE_STATS_HEALTH_THRESHOLD = "table_stats_health_threshold";
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE
= "enable_materialized_view_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE = "enable_materialized_view_rewrite";
public static final String MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_EXTERNAL_TABLE
= "materialized_view_rewrite_enable_contain_external_table";
public static final String MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_EXTERNAL_TABLE = "materialized_view_rewrite_enable_contain_external_table";
public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique";
public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax";
// When set use fix replica = true, the fixed replica maybe bad, try to use the health one if
// When set use fix replica = true, the fixed replica maybe bad, try to use the
// health one if
// this session variable is set to true.
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt";
@ -491,20 +492,22 @@ public class SessionVariable implements Serializable, Writable {
public static final String DESCRIBE_EXTEND_VARIANT_COLUMN = "describe_extend_variant_column";
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
SKIP_DELETE_SIGN,
SKIP_STORAGE_ENGINE_MERGE,
SHOW_HIDDEN_COLUMNS
);
SHOW_HIDDEN_COLUMNS);
public static final String ENABLE_STATS = "enable_stats";
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
* If set false, user couldn't submit analyze SQL and FE won't allocate any
* related resources.
*/
@VariableMgr.VarAttr(name = ENABLE_STATS)
public boolean enableStats = true;
public boolean enableStats = true;
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
@ -537,7 +540,8 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE)
public boolean enableExchangeNodeParallelMerge = false;
// By default, the number of Limit items after OrderBy is changed from 65535 items
// By default, the number of Limit items after OrderBy is changed from 65535
// items
// before v1.2.0 (not included), to return all items by default
@VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT)
private long defaultOrderByLimit = -1;
@ -550,13 +554,14 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, flag = VariableMgr.GLOBAL, needForward = true)
public int analyzeTimeoutS = 43200;
// The global max_execution_time value provides the default for the session value for new connections.
// The session value applies to SELECT executions executed within the session that include
// The global max_execution_time value provides the default for the session
// value for new connections.
// The session value applies to SELECT executions executed within the session
// that include
// no MAX_EXECUTION_TIME(N) optimizer hint or for which N is 0.
// https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html
// So that it is == query timeout in doris
@VariableMgr.VarAttr(name = MAX_EXECUTION_TIME, checker = "checkMaxExecutionTimeMSValid",
setter = "setMaxExecutionTimeMS")
@VariableMgr.VarAttr(name = MAX_EXECUTION_TIME, checker = "checkMaxExecutionTimeMSValid", setter = "setMaxExecutionTimeMS")
public int maxExecutionTimeMS = 900000;
@VariableMgr.VarAttr(name = INSERT_TIMEOUT)
@ -573,7 +578,7 @@ public class SessionVariable implements Serializable, Writable {
public int runtimeFilterJumpThreshold = 2;
// using hashset instead of group by + count can improve performance
// but may cause rpc failed when cluster has less BE
// but may cause rpc failed when cluster has less BE
// Whether this switch is turned on depends on the BE number
@VariableMgr.VarAttr(name = ENABLE_SINGLE_DISTINCT_COLUMN_OPT)
public boolean enableSingleDistinctColumnOpt = false;
@ -643,19 +648,23 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = QUERY_CACHE_TYPE)
public int queryCacheType = 0;
// The number of seconds the server waits for activity on an interactive connection before closing it
// The number of seconds the server waits for activity on an interactive
// connection before closing it
@VariableMgr.VarAttr(name = INTERACTIVE_TIMTOUT)
public int interactiveTimeout = 3600;
// The number of seconds the server waits for activity on a noninteractive connection before closing it.
// The number of seconds the server waits for activity on a noninteractive
// connection before closing it.
@VariableMgr.VarAttr(name = WAIT_TIMEOUT)
public int waitTimeoutS = 28800;
// The number of seconds to wait for a block to be written to a connection before aborting the write
// The number of seconds to wait for a block to be written to a connection
// before aborting the write
@VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT)
public int netWriteTimeout = 60;
// The number of seconds to wait for a block to be written to a connection before aborting the write
// The number of seconds to wait for a block to be written to a connection
// before aborting the write
@VariableMgr.VarAttr(name = NET_READ_TIMEOUT)
public int netReadTimeout = 60;
@ -706,12 +715,10 @@ public class SessionVariable implements Serializable, Writable {
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
*/
@VariableMgr.VarAttr(name = PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM, needForward = true, fuzzy = true,
setter = "setFragmentInstanceNum")
@VariableMgr.VarAttr(name = PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM, needForward = true, fuzzy = true, setter = "setFragmentInstanceNum")
public int parallelExecInstanceNum = 8;
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true,
setter = "setPipelineTaskNum")
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, needForward = true, setter = "setPipelineTaskNum")
public int parallelPipelineTaskNum = 0;
@VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true)
@ -726,10 +733,10 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_ODBC_TRANSCATION)
public boolean enableOdbcTransaction = false;
@VariableMgr.VarAttr(name = ENABLE_SCAN_RUN_SERIAL, description = {
@VariableMgr.VarAttr(name = ENABLE_SCAN_RUN_SERIAL, description = {
"是否开启ScanNode串行读,以避免limit较小的情况下的读放大,可以提高查询的并发能力",
"Whether to enable ScanNode serial reading to avoid read amplification in cases of small limits"
+ "which can improve query concurrency. default is false."})
+ "which can improve query concurrency. default is false." })
public boolean enableScanRunSerial = false;
@VariableMgr.VarAttr(name = ENABLE_SQL_CACHE)
@ -786,9 +793,8 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_NEREIDS_DML, needForward = true)
public boolean enableNereidsDML = true;
@VariableMgr.VarAttr(name = ENABLE_NEREIDS_DML_WITH_PIPELINE, needForward = true,
varType = VariableAnnotation.EXPERIMENTAL,
description = {"在新优化器中,使用pipeline引擎执行DML", "execute DML with pipeline engine in Nereids"})
@VariableMgr.VarAttr(name = ENABLE_NEREIDS_DML_WITH_PIPELINE, needForward = true, varType = VariableAnnotation.EXPERIMENTAL, description = {
"在新优化器中,使用pipeline引擎执行DML", "execute DML with pipeline engine in Nereids" })
public boolean enableNereidsDmlWithPipeline = false;
@VariableMgr.VarAttr(name = ENABLE_STRICT_CONSISTENCY_DML, needForward = true)
@ -797,41 +803,33 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableVectorizedEngine = true;
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true, needForward = true,
varType = VariableAnnotation.EXPERIMENTAL)
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true, needForward = true, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enablePipelineEngine = true;
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_X_ENGINE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enablePipelineXEngine = false;
@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean enableSharedScan = false;
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_SCAN, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_SCAN, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean enableParallelScan = true;
@VariableMgr.VarAttr(name = PARALLEL_SCAN_MAX_SCANNERS_COUNT, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
@VariableMgr.VarAttr(name = PARALLEL_SCAN_MAX_SCANNERS_COUNT, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private int parallelScanMaxScannersCount = 48;
@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private long parallelScanMinRowsPerScanner = 2097152; // 2MB
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = true;
@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
"Whether to enable local shuffle on pipelineX engine."})
@VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {
"是否在pipelineX引擎上开启local shuffle优化",
"Whether to enable local shuffle on pipelineX engine." })
private boolean enableLocalShuffle = true;
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
public boolean enableAggState = false;
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
@ -847,7 +845,8 @@ public class SessionVariable implements Serializable, Writable {
public boolean trimTailingSpacesForExternalTableQuery = false;
// the maximum size in bytes for a table that will be broadcast to all be nodes
// when performing a join, By setting this value to -1 broadcasting can be disabled.
// when performing a join, By setting this value to -1 broadcasting can be
// disabled.
// Default value is 1Gto
@VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
public double autoBroadcastJoinThreshold = 0.8;
@ -969,7 +968,6 @@ public class SessionVariable implements Serializable, Writable {
this.maxJoinNumberOfReorder = maxJoinNumberOfReorder;
}
@VariableMgr.VarAttr(name = MAX_JOIN_NUMBER_BUSHY_TREE)
private int maxJoinNumBushyTree = 8;
@ -993,8 +991,8 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = DECIMAL_OVERFLOW_SCALE, needForward = true, description = {
"当decimal数值计算结果精度溢出时,计算结果最多可保留的小数位数", "When the precision of the result of"
+ " a decimal numerical calculation overflows,"
+ "the maximum number of decimal scale that the result can be retained"
+ " a decimal numerical calculation overflows,"
+ "the maximum number of decimal scale that the result can be retained"
})
public int decimalOverflowScale = 6;
@ -1026,8 +1024,7 @@ public class SessionVariable implements Serializable, Writable {
* the new optimizer is fully developed. I hope that day
* would be coming soon.
*/
@VariableMgr.VarAttr(name = ENABLE_NEREIDS_PLANNER, needForward = true,
fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL)
@VariableMgr.VarAttr(name = ENABLE_NEREIDS_PLANNER, needForward = true, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enableNereidsPlanner = true;
@VariableMgr.VarAttr(name = DISABLE_NEREIDS_RULES, needForward = true)
@ -1067,15 +1064,16 @@ public class SessionVariable implements Serializable, Writable {
public boolean enableRuntimeFilterPrune = true;
/**
* The client can pass some special information by setting this session variable in the format: "k1:v1;k2:v2".
* For example, trace_id can be passed to trace the query request sent by the user.
* The client can pass some special information by setting this session variable
* in the format: "k1:v1;k2:v2".
* For example, trace_id can be passed to trace the query request sent by the
* user.
* set session_context="trace_id:1234565678";
*/
@VariableMgr.VarAttr(name = SESSION_CONTEXT, needForward = true)
public String sessionContext = "";
@VariableMgr.VarAttr(name = ENABLE_SINGLE_REPLICA_INSERT,
needForward = true, varType = VariableAnnotation.EXPERIMENTAL)
@VariableMgr.VarAttr(name = ENABLE_SINGLE_REPLICA_INSERT, needForward = true, varType = VariableAnnotation.EXPERIMENTAL)
public boolean enableSingleReplicaInsert = false;
@VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN, fuzzy = true)
@ -1114,21 +1112,31 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = SKIP_DELETE_BITMAP)
public boolean skipDeleteBitmap = false;
// This variable replace the original FE config `recover_with_skip_missing_version`.
// In some scenarios, all replicas of tablet are having missing versions, and the tablet is unable to recover.
// This config can control the behavior of query. When it is set to `true`, the query will ignore the
// visible version recorded in FE partition, use the replica version. If the replica on BE has missing versions,
// the query will directly skip this missing version, and only return the data of the existing versions.
// Besides, the query will always try to select the one with the highest lastSuccessVersion among all surviving
// This variable replace the original FE config
// `recover_with_skip_missing_version`.
// In some scenarios, all replicas of tablet are having missing versions, and
// the tablet is unable to recover.
// This config can control the behavior of query. When it is set to `true`, the
// query will ignore the
// visible version recorded in FE partition, use the replica version. If the
// replica on BE has missing versions,
// the query will directly skip this missing version, and only return the data
// of the existing versions.
// Besides, the query will always try to select the one with the highest
// lastSuccessVersion among all surviving
// BE replicas, so as to recover as much data as possible.
// You should only open it in the emergency scenarios mentioned above, only used for temporary recovery queries.
// This variable conflicts with the use_fix_replica variable, when the use_fix_replica variable is not -1,
// You should only open it in the emergency scenarios mentioned above, only used
// for temporary recovery queries.
// This variable conflicts with the use_fix_replica variable, when the
// use_fix_replica variable is not -1,
// this variable will not work.
@VariableMgr.VarAttr(name = SKIP_MISSING_VERSION)
public boolean skipMissingVersion = false;
// This variable is used to avoid FE fallback to the original parser. When we execute SQL in regression tests
// for nereids, fallback will cause the Doris return the correct result although the syntax is unsupported
// This variable is used to avoid FE fallback to the original parser. When we
// execute SQL in regression tests
// for nereids, fallback will cause the Doris return the correct result although
// the syntax is unsupported
// in nereids for some mistaken modification. You should set it on the
@VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER, needForward = true)
public boolean enableFallbackToOriginalPlanner = true;
@ -1152,11 +1160,13 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = INTERNAL_SESSION)
public boolean internalSession = false;
// Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set.
// Use partitioned hash join if build side row count >= the threshold . 0 - the
// threshold is not set.
@VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD, fuzzy = true)
public int partitionedHashJoinRowsThreshold = 0;
// Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set.
// Use partitioned hash join if build side row count >= the threshold . 0 - the
// threshold is not set.
@VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy = true)
public int partitionedHashAggRowsThreshold = 0;
@ -1178,23 +1188,21 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = GROUP_CONCAT_MAX_LEN)
public long groupConcatMaxLen = 2147483646;
// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
// If the memory consumption of sort node exceed this limit, will trigger spill
// to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 134217728;
@VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD,
checker = "checkExternalSortBytesThreshold", fuzzy = true)
@VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD, checker = "checkExternalSortBytesThreshold", fuzzy = true)
public long externalSortBytesThreshold = 0;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_AGG_BYTES_THRESHOLD = 134217728;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD,
checker = "checkExternalAggBytesThreshold", fuzzy = true)
@VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD, checker = "checkExternalAggBytesThreshold", fuzzy = true)
public long externalAggBytesThreshold = 0;
public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 8;
@VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
checker = "checkExternalAggPartitionBits", fuzzy = true)
@VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS, checker = "checkExternalAggPartitionBits", fuzzy = true)
public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks.
// Whether enable two phase read optimization
@ -1210,39 +1218,41 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = GROUP_BY_AND_HAVING_USE_ALIAS_FIRST)
public boolean groupByAndHavingUseAliasFirst = false;
// Whether enable block file cache. Only take effect when BE config item enable_file_cache is true.
// Whether enable block file cache. Only take effect when BE config item
// enable_file_cache is true.
@VariableMgr.VarAttr(name = ENABLE_FILE_CACHE, needForward = true, description = {
"是否启用file cache。该变量只有在be.conf中enable_file_cache=true时才有效,"
+ "如果be.conf中enable_file_cache=false,该BE节点的file cache处于禁用状态。",
"Set wether to use file cache. This variable takes effect only if the BE config enable_file_cache=true. "
+ "The cache is not used when BE config enable_file_cache=false."})
+ "The cache is not used when BE config enable_file_cache=false." })
public boolean enableFileCache = false;
// Specify base path for file cache, or chose a random path.
@VariableMgr.VarAttr(name = FILE_CACHE_BASE_PATH, needForward = true, description = {
"指定block file cache在BE上的存储路径,默认 'random',随机选择BE配置的存储路径。",
"Specify the storage path of the block file cache on BE, default 'random', "
+ "and randomly select the storage path configured by BE."})
+ "and randomly select the storage path configured by BE." })
public String fileCacheBasePath = "random";
// Whether enable query with inverted index.
@VariableMgr.VarAttr(name = ENABLE_INVERTED_INDEX_QUERY, needForward = true, description = {
"是否启用inverted index query。", "Set whether to use inverted index query."})
"是否启用inverted index query。", "Set whether to use inverted index query." })
public boolean enableInvertedIndexQuery = true;
// Whether enable pushdown count agg to scan node when using inverted index match.
// Whether enable pushdown count agg to scan node when using inverted index
// match.
@VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_COUNT_ON_INDEX, needForward = true, description = {
"是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index."})
"是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index." })
public boolean enablePushDownCountOnIndex = true;
// Whether enable pushdown minmax to scan node of unique table.
@VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE, needForward = true, description = {
"是否启用pushdown minmax on unique table。", "Set whether to pushdown minmax on unique table."})
"是否启用pushdown minmax on unique table。", "Set whether to pushdown minmax on unique table." })
public boolean enablePushDownMinMaxOnUnique = false;
// Whether enable push down string type minmax to scan node.
@VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_STRING_MINMAX, needForward = true, description = {
"是否启用string类型min max下推。", "Set whether to enable push down string type minmax."})
"是否启用string类型min max下推。", "Set whether to enable push down string type minmax." })
public boolean enablePushDownStringMinMax = false;
// Whether drop table when create table as select insert data appear error.
@ -1272,13 +1282,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_MINIDUMP)
public boolean enableMinidump = false;
@VariableMgr.VarAttr(
name = ENABLE_PAGE_CACHE,
description = {"控制是否启用page cache。默认为 true。",
"Controls whether to use page cache. "
+ "The default value is true."},
needForward = true)
@VariableMgr.VarAttr(name = ENABLE_PAGE_CACHE, description = { "控制是否启用page cache。默认为 true。",
"Controls whether to use page cache. "
+ "The default value is true." }, needForward = true)
public boolean enablePageCache = true;
@VariableMgr.VarAttr(name = ENABLE_FOLD_NONDETERMINISTIC_FN)
@ -1301,33 +1307,25 @@ public class SessionVariable implements Serializable, Writable {
public long fileSplitSize = 0;
/**
* determine should we enable unified load (use insert stmt as the backend for all load)
* determine should we enable unified load (use insert stmt as the backend for
* all load)
*/
@VariableMgr.VarAttr(name = ENABLE_UNIFIED_LOAD, needForward = true)
public boolean enableUnifiedLoad = false;
@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true",
"Controls whether to use lazy materialization technology in parquet reader. "
+ "The default value is true."},
needForward = true)
@VariableMgr.VarAttr(name = ENABLE_PARQUET_LAZY_MAT, description = { "控制 parquet reader 是否启用延迟物化技术。默认为 true。",
"Controls whether to use lazy materialization technology in parquet reader. "
+ "The default value is true." }, needForward = true)
public boolean enableParquetLazyMat = true;
@VariableMgr.VarAttr(
name = ENABLE_ORC_LAZY_MAT,
description = {"控制 orc reader 是否启用延迟物化技术。默认为 true。",
"Controls whether to use lazy materialization technology in orc reader. "
+ "The default value is true."},
needForward = true)
@VariableMgr.VarAttr(name = ENABLE_ORC_LAZY_MAT, description = { "控制 orc reader 是否启用延迟物化技术。默认为 true。",
"Controls whether to use lazy materialization technology in orc reader. "
+ "The default value is true." }, needForward = true)
public boolean enableOrcLazyMat = true;
@VariableMgr.VarAttr(
name = EXTERNAL_TABLE_ANALYZE_PART_NUM,
description = {"收集外表统计信息行数时选取的采样分区数,默认-1表示全部分区",
"Number of sample partition for collecting external table line number, "
+ "default -1 means all partitions"},
needForward = false)
@VariableMgr.VarAttr(name = EXTERNAL_TABLE_ANALYZE_PART_NUM, description = { "收集外表统计信息行数时选取的采样分区数,默认-1表示全部分区",
"Number of sample partition for collecting external table line number, "
+ "default -1 means all partitions" }, needForward = false)
public int externalTableAnalyzePartNum = -1;
@VariableMgr.VarAttr(name = INLINE_CTE_REFERENCED_THRESHOLD)
@ -1339,7 +1337,7 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_ANALYZE_COMPLEX_TYPE_COLUMN)
public boolean enableAnalyzeComplexTypeColumn = false;
@VariableMgr.VarAttr(name = ENABLE_STRONG_CONSISTENCY, description = {"用以开启强一致读。Doris 默认支持同一个会话内的"
@VariableMgr.VarAttr(name = ENABLE_STRONG_CONSISTENCY, description = { "用以开启强一致读。Doris 默认支持同一个会话内的"
+ "强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。",
"Used to enable strong consistent reading. By default, Doris supports strong consistency "
+ "within the same session, that is, changes to data within the same session are visible in "
@ -1353,17 +1351,16 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_DELETE_SUB_PREDICATE_V2, fuzzy = true, needForward = true)
public boolean enableDeleteSubPredicateV2 = true;
@VariableMgr.VarAttr(name = TRUNCATE_CHAR_OR_VARCHAR_COLUMNS,
description = {"是否按照表的 schema 来截断 char 或者 varchar 列。默认为 false。\n"
@VariableMgr.VarAttr(name = TRUNCATE_CHAR_OR_VARCHAR_COLUMNS, description = {
"是否按照表的 schema 来截断 char 或者 varchar 列。默认为 false。\n"
+ "因为外表会存在表的 schema 中 char 或者 varchar 列的最大长度和底层 parquet 或者 orc 文件中的 schema 不一致"
+ "的情况。此时开启改选项,会按照表的 schema 中的最大长度进行截断。",
"Whether to truncate char or varchar columns according to the table's schema. "
+ "The default is false.\n"
"Whether to truncate char or varchar columns according to the table's schema. "
+ "The default is false.\n"
+ "Because the maximum length of the char or varchar column in the schema of the table"
+ " is inconsistent with the schema in the underlying parquet or orc file."
+ " is inconsistent with the schema in the underlying parquet or orc file."
+ " At this time, if the option is turned on, it will be truncated according to the maximum length"
+ " in the schema of the table."},
needForward = true)
+ " in the schema of the table." }, needForward = true)
public boolean truncateCharOrVarcharColumns = false;
@VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = true)
@ -1375,30 +1372,28 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = GROUP_COMMIT)
public String groupCommit = "off_mode";
@VariableMgr.VarAttr(name = INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD,
description = {"在match_all中求取多个倒排索引的交集时,如果最大的倒排索引中的总数是最小倒排索引中的总数的整数倍,"
@VariableMgr.VarAttr(name = INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD, description = {
"在match_all中求取多个倒排索引的交集时,如果最大的倒排索引中的总数是最小倒排索引中的总数的整数倍,"
+ "则使用跳表来优化交集操作。",
"When intersecting multiple inverted indexes in match_all,"
"When intersecting multiple inverted indexes in match_all,"
+ " if the maximum total count of the largest inverted index"
+ " is a multiple of the minimum total count of the smallest inverted index,"
+ " use a skiplist to optimize the intersection."})
+ " use a skiplist to optimize the intersection." })
public int invertedIndexConjunctionOptThreshold = 1000;
@VariableMgr.VarAttr(name = INVERTED_INDEX_MAX_EXPANSIONS,
description = {"这个参数用来限制查询时扩展的词项(terms)的数量,以此来控制查询的性能",
"This parameter is used to limit the number of term expansions during a query,"
+ " thereby controlling query performance"})
@VariableMgr.VarAttr(name = INVERTED_INDEX_MAX_EXPANSIONS, description = { "这个参数用来限制查询时扩展的词项(terms)的数量,以此来控制查询的性能",
"This parameter is used to limit the number of term expansions during a query,"
+ " thereby controlling query performance" })
public int invertedIndexMaxExpansions = 50;
@VariableMgr.VarAttr(name = INVERTED_INDEX_SKIP_THRESHOLD,
description = {"在倒排索引中如果预估命中量占比总量超过百分比阈值,则跳过索引直接进行匹配。",
"In the inverted index,"
+ " if the estimated hit ratio exceeds the percentage threshold of the total amount, "
+ " then skip the index and proceed directly to matching."})
@VariableMgr.VarAttr(name = INVERTED_INDEX_SKIP_THRESHOLD, description = { "在倒排索引中如果预估命中量占比总量超过百分比阈值,则跳过索引直接进行匹配。",
"In the inverted index,"
+ " if the estimated hit ratio exceeds the percentage threshold of the total amount, "
+ " then skip the index and proceed directly to matching." })
public int invertedIndexSkipThreshold = 50;
@VariableMgr.VarAttr(name = SQL_DIALECT, needForward = true, checker = "checkSqlDialect",
description = {"解析sql使用的方言", "The dialect used to parse sql."})
@VariableMgr.VarAttr(name = SQL_DIALECT, needForward = true, checker = "checkSqlDialect", description = {
"解析sql使用的方言", "The dialect used to parse sql." })
public String sqlDialect = "doris";
@VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true)
@ -1407,48 +1402,42 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = TEST_QUERY_CACHE_HIT, description = {
"用于测试查询缓存是否命中,如果未命中指定类型的缓存,则会报错",
"Used to test whether the query cache is hit. "
+ "If the specified type of cache is not hit, an error will be reported."},
options = {"none", "sql_cache", "partition_cache"})
+ "If the specified type of cache is not hit, an error will be reported." }, options = { "none",
"sql_cache", "partition_cache" })
public String testQueryCacheHit = "none";
@VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE,
description = {"该参数控制是否开启自动收集", "Set false to disable auto analyze"},
flag = VariableMgr.GLOBAL)
@VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE, description = { "该参数控制是否开启自动收集",
"Set false to disable auto analyze" }, flag = VariableMgr.GLOBAL)
public boolean enableAutoAnalyze = true;
@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
"Maximum table width to enable auto analyze, "
+ "table with more columns than this value will not be auto analyzed."},
flag = VariableMgr.GLOBAL)
@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD, description = { "参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
"Maximum table width to enable auto analyze, "
+ "table with more columns than this value will not be auto analyzed." }, flag = VariableMgr.GLOBAL)
public int autoAnalyzeTableWidthThreshold = 70;
@VariableMgr.VarAttr(name = AUTO_ANALYZE_START_TIME, needForward = true, checker = "checkAnalyzeTimeFormat",
description = {"该参数定义自动ANALYZE例程的开始时间",
"This parameter defines the start time for the automatic ANALYZE routine."},
flag = VariableMgr.GLOBAL)
@VariableMgr.VarAttr(name = AUTO_ANALYZE_START_TIME, needForward = true, checker = "checkAnalyzeTimeFormat", description = {
"该参数定义自动ANALYZE例程的开始时间",
"This parameter defines the start time for the automatic ANALYZE routine." }, flag = VariableMgr.GLOBAL)
public String autoAnalyzeStartTime = "00:00:00";
@VariableMgr.VarAttr(name = AUTO_ANALYZE_END_TIME, needForward = true, checker = "checkAnalyzeTimeFormat",
description = {"该参数定义自动ANALYZE例程的结束时间",
"This parameter defines the end time for the automatic ANALYZE routine."},
flag = VariableMgr.GLOBAL)
@VariableMgr.VarAttr(name = AUTO_ANALYZE_END_TIME, needForward = true, checker = "checkAnalyzeTimeFormat", description = {
"该参数定义自动ANALYZE例程的结束时间",
"This parameter defines the end time for the automatic ANALYZE routine." }, flag = VariableMgr.GLOBAL)
public String autoAnalyzeEndTime = "23:59:59";
@VariableMgr.VarAttr(name = FASTER_FLOAT_CONVERT,
description = {"是否启用更快的浮点数转换算法,注意会影响输出格式", "Set true to enable faster float pointer number convert"})
@VariableMgr.VarAttr(name = FASTER_FLOAT_CONVERT, description = { "是否启用更快的浮点数转换算法,注意会影响输出格式",
"Set true to enable faster float pointer number convert" })
public boolean fasterFloatConvert = false;
@VariableMgr.VarAttr(name = IGNORE_RUNTIME_FILTER_IDS,
description = {"在IGNORE_RUNTIME_FILTER_IDS列表中的runtime filter将不会被生成",
"the runtime filter id in IGNORE_RUNTIME_FILTER_IDS list will not be generated"})
@VariableMgr.VarAttr(name = IGNORE_RUNTIME_FILTER_IDS, description = {
"在IGNORE_RUNTIME_FILTER_IDS列表中的runtime filter将不会被生成",
"the runtime filter id in IGNORE_RUNTIME_FILTER_IDS list will not be generated" })
public String ignoreRuntimeFilterIds = "";
@VariableMgr.VarAttr(name = STATS_INSERT_MERGE_ITEM_COUNT, flag = VariableMgr.GLOBAL, description = {
"控制统计信息相关INSERT攒批数量", "Controls the batch size for stats INSERT merging."
}
)
})
public int statsInsertMergeItemCount = 200;
@VariableMgr.VarAttr(name = HUGE_TABLE_DEFAULT_SAMPLE_ROWS, flag = VariableMgr.GLOBAL, description = {
@ -1459,50 +1448,52 @@ public class SessionVariable implements Serializable, Writable {
})
public long hugeTableDefaultSampleRows = 4194304;
@VariableMgr.VarAttr(name = HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES, flag = VariableMgr.GLOBAL,
description = {
"大小超过该值的表将会自动通过采样收集统计信息",
"This defines the lower size bound for large tables. "
+ "When enable_auto_sample is enabled, tables"
+ "larger than this value will automatically collect "
+ "statistics through sampling"})
@VariableMgr.VarAttr(name = HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES, flag = VariableMgr.GLOBAL, description = {
"大小超过该值的表将会自动通过采样收集统计信息",
"This defines the lower size bound for large tables. "
+ "When enable_auto_sample is enabled, tables"
+ "larger than this value will automatically collect "
+ "statistics through sampling" })
public long hugeTableLowerBoundSizeInBytes = 0;
@VariableMgr.VarAttr(name = HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL,
description = {"控制对大表的自动ANALYZE的最小时间间隔,"
@VariableMgr.VarAttr(name = HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL, description = {
"控制对大表的自动ANALYZE的最小时间间隔,"
+ "在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次",
"This controls the minimum time interval for automatic ANALYZE on large tables."
+ "Within this interval,"
+ "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once."})
"This controls the minimum time interval for automatic ANALYZE on large tables."
+ "Within this interval,"
+ "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once." })
public long hugeTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(0);
@VariableMgr.VarAttr(name = EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL,
description = {"控制对外表的自动ANALYZE的最小时间间隔,在该时间间隔内的外表仅ANALYZE一次",
"This controls the minimum time interval for automatic ANALYZE on external tables."
+ "Within this interval, external tables are analyzed only once."})
@VariableMgr.VarAttr(name = EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL, description = {
"控制对外表的自动ANALYZE的最小时间间隔,在该时间间隔内的外表仅ANALYZE一次",
"This controls the minimum time interval for automatic ANALYZE on external tables."
+ "Within this interval, external tables are analyzed only once." })
public long externalTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(24);
@VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = VariableMgr.GLOBAL,
description = {"取值在0-100之间,当自上次统计信息收集操作之后"
@VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = VariableMgr.GLOBAL, description = {
"取值在0-100之间,当自上次统计信息收集操作之后"
+ "数据更新量达到 (100 - table_stats_health_threshold)% ,认为该表的统计信息已过时",
"The value should be between 0 and 100. When the data update quantity "
+ "exceeds (100 - table_stats_health_threshold)% since the last "
+ "statistics collection operation, the statistics for this table are"
+ "considered outdated."})
"The value should be between 0 and 100. When the data update quantity "
+ "exceeds (100 - table_stats_health_threshold)% since the last "
+ "statistics collection operation, the statistics for this table are"
+ "considered outdated." })
public int tableStatsHealthThreshold = 60;
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_REWRITE, needForward = true,
description = {"是否开启基于结构信息的物化视图透明改写",
"Whether to enable materialized view rewriting based on struct info"})
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_REWRITE, needForward = true, description = {
"是否开启基于结构信息的物化视图透明改写",
"Whether to enable materialized view rewriting based on struct info" })
public boolean enableMaterializedViewRewrite = false;
@VariableMgr.VarAttr(name = MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_EXTERNAL_TABLE, needForward = true,
description = {"基于结构信息的透明改写,是否使用包含外表的物化视图",
"whether to use a materialized view that contains the foreign table "
+ "when using rewriting based on struct info"})
@VariableMgr.VarAttr(name = MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_EXTERNAL_TABLE, needForward = true, description = {
"基于结构信息的透明改写,是否使用包含外表的物化视图",
"whether to use a materialized view that contains the foreign table "
+ "when using rewriting based on struct info" })
public boolean materializedViewRewriteEnableContainExternalTable = false;
@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER, description = { "强制使用jni方式读取外表",
"Force the use of jni mode to read external table" })
private boolean forceJniScanner = false;
public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids";
public Set<Integer> getIgnoredRuntimeFilterIds() {
@ -1512,7 +1503,7 @@ public class SessionVariable implements Serializable, Writable {
try {
res = Integer.valueOf(v);
} catch (Exception e) {
//ignore it
// ignore it
}
return res;
}).collect(ImmutableSet.toImmutableSet());
@ -1532,21 +1523,21 @@ public class SessionVariable implements Serializable, Writable {
this.ignoreShapePlanNodes = ignoreShapePlanNodes;
}
@VariableMgr.VarAttr(name = IGNORE_SHAPE_NODE,
description = {"'explain shape plan' 命令中忽略的PlanNode 类型",
"the plan node type which is ignored in 'explain shape plan' command"})
@VariableMgr.VarAttr(name = IGNORE_SHAPE_NODE, description = { "'explain shape plan' 命令中忽略的PlanNode 类型",
"the plan node type which is ignored in 'explain shape plan' command" })
public String ignoreShapePlanNodes = "";
@VariableMgr.VarAttr(name = ENABLE_DECIMAL256, needForward = true, description = { "控制是否在计算过程中使用Decimal256类型",
"Set to true to enable Decimal256 type" })
public boolean enableDecimal256 = false;
@VariableMgr.VarAttr(name = FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT, needForward = true,
description = { "当开启use_fix_replica时遇到故障,是否漂移到其他健康的副本",
"use other health replica when the use_fix_replica meet error" })
@VariableMgr.VarAttr(name = FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT, needForward = true, description = {
"当开启use_fix_replica时遇到故障,是否漂移到其他健康的副本",
"use other health replica when the use_fix_replica meet error" })
public boolean fallbackOtherReplicaWhenFixedCorrupt = false;
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate
// some variables,
// not the default value set in the code.
@SuppressWarnings("checkstyle:Indentation")
public void initFuzzyModeVariables() {
@ -1573,29 +1564,30 @@ public class SessionVariable implements Serializable, Writable {
this.enableDeleteSubPredicateV2 = true;
}
/*
switch (randomInt) {
case 0:
this.externalSortBytesThreshold = 0;
this.externalAggBytesThreshold = 0;
break;
case 1:
this.externalSortBytesThreshold = 1;
this.externalAggBytesThreshold = 1;
this.externalAggPartitionBits = 6;
break;
case 2:
this.externalSortBytesThreshold = 1024 * 1024;
this.externalAggBytesThreshold = 1024 * 1024;
this.externalAggPartitionBits = 8;
break;
default:
this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024;
this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024;
this.externalAggPartitionBits = 4;
break;
}
*/
// pull_request_id default value is 0. When it is 0, use default (global) session variable.
* switch (randomInt) {
* case 0:
* this.externalSortBytesThreshold = 0;
* this.externalAggBytesThreshold = 0;
* break;
* case 1:
* this.externalSortBytesThreshold = 1;
* this.externalAggBytesThreshold = 1;
* this.externalAggPartitionBits = 6;
* break;
* case 2:
* this.externalSortBytesThreshold = 1024 * 1024;
* this.externalAggBytesThreshold = 1024 * 1024;
* this.externalAggPartitionBits = 8;
* break;
* default:
* this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024;
* this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024;
* this.externalAggPartitionBits = 4;
* break;
* }
*/
// pull_request_id default value is 0. When it is 0, use default (global)
// session variable.
if (Config.pull_request_id > 0) {
this.enablePipelineEngine = true;
this.enableNereidsPlanner = true;
@ -1676,7 +1668,8 @@ public class SessionVariable implements Serializable, Writable {
/**
* syntax:
* all -> use all event
* all except event_1, event_2, ..., event_n -> use all events excluding the event_1~n
* all except event_1, event_2, ..., event_n -> use all events excluding the
* event_1~n
* event_1, event_2, ..., event_n -> use event_1~n
*/
@VariableMgr.VarAttr(name = NEREIDS_TRACE_EVENT_MODE, checker = "checkNereidsTraceEventMode")
@ -1755,7 +1748,6 @@ public class SessionVariable implements Serializable, Writable {
return insertTimeoutS;
}
public void setInsertTimeoutS(int insertTimeoutS) {
this.insertTimeoutS = insertTimeoutS;
}
@ -2618,9 +2610,8 @@ public class SessionVariable implements Serializable, Writable {
public void checkQueryTimeoutValid(String newQueryTimeout) {
int value = Integer.valueOf(newQueryTimeout);
if (value <= 0) {
UnsupportedOperationException exception =
new UnsupportedOperationException(
"query_timeout can not be set to " + newQueryTimeout + ", it must be greater than 0");
UnsupportedOperationException exception = new UnsupportedOperationException(
"query_timeout can not be set to " + newQueryTimeout + ", it must be greater than 0");
LOG.warn("Check query_timeout failed", exception);
throw exception;
}
@ -2629,10 +2620,9 @@ public class SessionVariable implements Serializable, Writable {
public void checkMaxExecutionTimeMSValid(String newValue) {
int value = Integer.valueOf(newValue);
if (value < 1000) {
UnsupportedOperationException exception =
new UnsupportedOperationException(
"max_execution_time can not be set to " + newValue
+ ", it must be greater or equal to 1000, the time unit is millisecond");
UnsupportedOperationException exception = new UnsupportedOperationException(
"max_execution_time can not be set to " + newValue
+ ", it must be greater or equal to 1000, the time unit is millisecond");
LOG.warn("Check max_execution_time failed", exception);
throw exception;
}
@ -2675,10 +2665,9 @@ public class SessionVariable implements Serializable, Writable {
}
public void setMaxTableCountUseCascadesJoinReorder(int maxTableCountUseCascadesJoinReorder) {
this.maxTableCountUseCascadesJoinReorder =
maxTableCountUseCascadesJoinReorder < MIN_JOIN_REORDER_TABLE_COUNT
? MIN_JOIN_REORDER_TABLE_COUNT
: maxTableCountUseCascadesJoinReorder;
this.maxTableCountUseCascadesJoinReorder = maxTableCountUseCascadesJoinReorder < MIN_JOIN_REORDER_TABLE_COUNT
? MIN_JOIN_REORDER_TABLE_COUNT
: maxTableCountUseCascadesJoinReorder;
}
public boolean isShowUserDefaultRole() {
@ -2874,7 +2863,6 @@ public class SessionVariable implements Serializable, Writable {
Text.writeString(out, root.toString());
}
public void readFields(DataInput in) throws IOException {
String json = Text.readString(in);
readFromJson(json);
@ -3261,4 +3249,12 @@ public class SessionVariable implements Serializable, Writable {
public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) {
this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
}
public boolean isForceJniScanner() {
return forceJniScanner;
}
public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}
}

View File

@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !c1 --
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true 2023-08-10T14:35:38.768
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true 2023-08-10T15:58:54.364
-- !c2 --
7 Hugo
8 Stop
-- !c3 --
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true 2023-08-10T14:35:38.768
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true 2023-08-10T15:58:54.364
-- !c4 --
7 Hugo
8 Stop

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("paimon_base_filesystem", "p2,external,paimon,external_remote,external_remote_paimon") {
String enabled = context.config.otherConfigs.get("enableExternalPaimonTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String catalog_cos = "paimon_cos"
String catalog_oss = "paimon_oss"
String ak = context.config.otherConfigs.get("aliYunAk")
String sk = context.config.otherConfigs.get("aliYunSk")
String s3ak = getS3AK()
String s3sk = getS3SK()
def cos = """select c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c18 from ${catalog_cos}.zd.all_table order by c18"""
def oss = """select * from ${catalog_oss}.paimonossdb1.test_tableoss order by a"""
sql """drop catalog if exists ${catalog_cos};"""
sql """drop catalog if exists ${catalog_oss};"""
sql """
create catalog if not exists ${catalog_cos} properties (
"type" = "paimon",
"warehouse" = "cosn://paimon-1308700295/paimoncos",
"cos.access_key" = "${s3ak}",
"cos.secret_key" = "${s3sk}",
"cos.endpoint" = "cos.ap-beijing.myqcloud.com"
);
"""
sql """
create catalog if not exists ${catalog_oss} properties (
"type" = "paimon",
"warehouse" = "oss://paimon-zd/paimonoss",
"oss.endpoint"="oss-cn-beijing.aliyuncs.com",
"oss.access_key"="${ak}",
"oss.secret_key"="${sk}"
);
"""
logger.info("catalog " + catalog_cos + " created")
logger.info("catalog " + catalog_oss + " created")
sql """set force_jni_scanner=false"""
qt_c1 cos
qt_c2 oss
sql """set force_jni_scanner=true"""
qt_c3 cos
qt_c4 oss
sql """set force_jni_scanner=false"""
}
}