diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index daf1be1cf0..81726d6a9c 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -40,7 +40,8 @@ const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop."; PaimonJniReader::PaimonJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, - const TFileRangeDesc& range) + const TFileRangeDesc& range, + const TFileScanRangeParams* range_params) : JniReader(file_slot_descs, state, profile) { std::vector column_names; std::vector column_types; @@ -61,8 +62,8 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d std::to_string(range.table_format_params.paimon_params.last_update_time); params["required_fields"] = join(column_names, ","); params["columns_types"] = join(column_types, "#"); - if (range.table_format_params.paimon_params.__isset.paimon_table) { - params["paimon_table"] = range.table_format_params.paimon_params.paimon_table; + if (range_params->__isset.serialized_table) { + params["serialized_table"] = range_params->serialized_table; } // Used to create paimon option diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 6ecf6cd1f1..220a6f1f2e 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -53,7 +53,8 @@ public: static const std::string PAIMON_OPTION_PREFIX; static const std::string HADOOP_OPTION_PREFIX; PaimonJniReader(const std::vector& file_slot_descs, RuntimeState* state, - RuntimeProfile* profile, const TFileRangeDesc& range); + RuntimeProfile* profile, const TFileRangeDesc& range, + const TFileScanRangeParams* range_params); ~PaimonJniReader() override = default; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 407de80c9f..acd590ba95 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -812,8 +812,8 @@ Status VFileScanner::_get_next_reader() { _cur_reader = std::move(mc_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "paimon") { - _cur_reader = - PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range); + _cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, + range, _params); init_status = ((PaimonJniReader*)(_cur_reader.get())) ->init_reader(_colname_to_value_range); } else if (range.__isset.table_format_params && diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index e85d465f66..6ffd5f1ad9 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -224,8 +224,8 @@ public class PaimonJniScanner extends JniScanner { } private void initTable() { - if (params.containsKey("paimon_table")) { - table = PaimonUtils.deserialize(params.get("paimon_table")); + if (params.containsKey("serialized_table")) { + table = PaimonUtils.deserialize(params.get("serialized_table")); } else { PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, hadoopOptionParams, dbName, tblName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 116f0a795f..bd312d0d61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -76,6 +76,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; /** * FileQueryScanNode for querying the file access type of catalog, now only support @@ -261,6 +262,11 @@ public abstract class FileQueryScanNode extends FileScanNode { protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { } + // Serialize the table to be scanned to BE's jni reader + protected Optional getSerializedTable() { + return Optional.empty(); + } + @Override public void createScanRangeLocations() throws UserException { long start = System.currentTimeMillis(); @@ -366,6 +372,8 @@ public abstract class FileQueryScanNode extends FileScanNode { } } + getSerializedTable().ifPresent(params::setSerializedTable); + if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index b61964481d..59f51c8425 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -102,6 +102,7 @@ public class PaimonScanNode extends FileQueryScanNode { private int paimonSplitNum = 0; private List splitStats = new ArrayList<>(); private SessionVariable sessionVariable; + private String serializedTable; public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, @@ -115,6 +116,7 @@ public class PaimonScanNode extends FileQueryScanNode { protected void doInitialize() throws UserException { super.doInitialize(); source = new PaimonSource(desc); + serializedTable = encodeObjectToString(source.getPaimonTable()); Preconditions.checkNotNull(source); } @@ -144,6 +146,11 @@ public class PaimonScanNode extends FileQueryScanNode { } } + @Override + protected Optional getSerializedTable() { + return Optional.of(serializedTable); + } + private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 8ba89387d9..9ded9a31ea 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -330,7 +330,7 @@ struct TPaimonFileDesc { 11: optional string file_format 12: optional TPaimonDeletionFileDesc deletion_file; 13: optional map hadoop_conf // deprecated - 14: optional string paimon_table + 14: optional string paimon_table // deprecated } struct TMaxComputeFileDesc { @@ -420,6 +420,13 @@ struct TFileScanRangeParams { 20: optional list pre_filter_exprs_list 21: optional Types.TUniqueId load_id 22: optional TTextSerdeType text_serde_type + // used by flexible partial update + 23: optional string sequence_map_col + // table from FE, used for jni scanner + // BE can use table director: + // 1. Reduce the access to HMS and HDFS on the JNI side. + // 2. There will be no inconsistency between the fe and be tables. + 24: optional string serialized_table } struct TFileRangeDesc {