[feature](serde) support presto compatible output format (#37039) (#37253)

bp #37039
This commit is contained in:
Mingyu Chen
2024-07-04 13:56:05 +08:00
committed by GitHub
parent 3613413a54
commit ceef9ee123
82 changed files with 8639 additions and 8313 deletions

View File

@ -36,6 +36,7 @@ import org.apache.doris.thrift.TGroupCommitMode;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterType;
import org.apache.doris.thrift.TSerdeDialect;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
@ -485,6 +486,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String SQL_DIALECT = "sql_dialect";
public static final String SERDE_DIALECT = "serde_dialect";
public static final String EXPAND_RUNTIME_FILTER_BY_INNER_JION = "expand_runtime_filter_by_inner_join";
public static final String TEST_QUERY_CACHE_HIT = "test_query_cache_hit";
@ -1598,16 +1601,22 @@ public class SessionVariable implements Serializable, Writable {
public int invertedIndexMaxExpansions = 50;
@VariableMgr.VarAttr(name = INVERTED_INDEX_SKIP_THRESHOLD,
description = {"在倒排索引中如果预估命中量占比总量超过百分比阈值,则跳过索引直接进行匹配。",
"In the inverted index,"
+ " if the estimated hit ratio exceeds the percentage threshold of the total amount, "
+ " then skip the index and proceed directly to matching."})
description = {"在倒排索引中如果预估命中量占比总量超过百分比阈值,则跳过索引直接进行匹配。",
"In the inverted index,"
+ " if the estimated hit ratio exceeds the percentage threshold of the total amount, "
+ " then skip the index and proceed directly to matching."})
public int invertedIndexSkipThreshold = 50;
@VariableMgr.VarAttr(name = SQL_DIALECT, needForward = true, checker = "checkSqlDialect",
description = {"解析sql使用的方言", "The dialect used to parse sql."})
public String sqlDialect = "doris";
@VariableMgr.VarAttr(name = SERDE_DIALECT, needForward = true, checker = "checkSerdeDialect",
description = {"返回给 MySQL 客户端时各数据类型的输出格式方言",
"The output format dialect of each data type returned to the MySQL client."},
options = {"doris", "presto", "trino"})
public String serdeDialect = "doris";
@VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = true)
public boolean enableUniqueKeyPartialUpdate = false;
@ -3345,6 +3354,7 @@ public class SessionVariable implements Serializable, Writable {
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());
return tResult;
}
@ -3795,6 +3805,20 @@ public class SessionVariable implements Serializable, Writable {
}
}
public void checkSerdeDialect(String serdeDialect) {
if (StringUtils.isEmpty(serdeDialect)) {
LOG.warn("serdeDialect value is empty");
throw new UnsupportedOperationException("serdeDialect value is empty");
}
if (!serdeDialect.equalsIgnoreCase("doris") && !serdeDialect.equalsIgnoreCase("presto")
&& !serdeDialect.equalsIgnoreCase("trino")) {
LOG.warn("serdeDialect value is invalid, the invalid value is {}", serdeDialect);
throw new UnsupportedOperationException(
"sqlDialect value is invalid, the invalid value is " + serdeDialect);
}
}
public boolean isEnableInsertGroupCommit() {
return Config.wait_internal_group_commit_finish
|| GroupCommitBlockSink.parseGroupCommit(groupCommit) == TGroupCommitMode.ASYNC_MODE
@ -3876,4 +3900,16 @@ public class SessionVariable implements Serializable, Writable {
public int getMaxMsgSizeOfResultReceiver() {
return this.maxMsgSizeOfResultReceiver;
}
private TSerdeDialect getSerdeDialect() {
switch (serdeDialect) {
case "doris":
return TSerdeDialect.DORIS;
case "presto":
case "trino":
return TSerdeDialect.PRESTO;
default:
throw new IllegalArgumentException("Unknown serde dialect: " + serdeDialect);
}
}
}