diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index f1ebb96fa1..4d6cfc96b5 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -42,9 +42,10 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d const TFileRangeDesc& range) : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { std::vector column_names; + std::vector column_types; for (auto& desc : _file_slot_descs) { - std::string field = desc->col_name(); - column_names.emplace_back(field); + column_names.emplace_back(desc->col_name()); + column_types.emplace_back(JniConnector::get_jni_type(desc->type())); } std::map params; params["db_name"] = range.table_format_params.paimon_params.db_name; @@ -57,6 +58,8 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id); params["last_update_time"] = std::to_string(range.table_format_params.paimon_params.last_update_time); + params["required_fields"] = join(column_names, ","); + params["columns_types"] = join(column_types, "#"); // Used to create paimon option for (auto& kv : range.table_format_params.paimon_params.paimon_options) { diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java index ec7d108bd3..4bb4db86d3 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java @@ -1034,6 +1034,9 @@ public class VectorColumn { public int appendBytesAndOffset(byte[] src, int offset, int length) { int startOffset = childColumns[0].appendBytes(src, offset, length); reserve(appendIndex + 1); + if (startOffset + length < 0) { + throw new RuntimeException("String overflow, offset=" + startOffset + ", length=" + length); + } OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + length); return appendIndex++; } @@ -1068,6 +1071,9 @@ public class VectorColumn { bytes = new byte[0]; } int startOffset = childColumns[0].appendBytes(bytes, 0, bytes.length); + if (startOffset + bytes.length < 0) { + throw new RuntimeException("Binary overflow, offset=" + startOffset + ", length=" + bytes.length); + } OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + bytes.length); appendIndex++; } @@ -1102,6 +1108,9 @@ public class VectorColumn { childColumns[0].appendValue(v); } reserve(appendIndex + 1); + if (startOffset + length < 0) { + throw new RuntimeException("Array overflow, offset=" + startOffset + ", length=" + length); + } OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset + length); return appendIndex++; } @@ -1158,6 +1167,9 @@ public class VectorColumn { childColumns[1].appendValue(v); } reserve(appendIndex + 1); + if (startOffset + length < 0) { + throw new RuntimeException("Map overflow, offset=" + startOffset + ", length=" + length); + } OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset + length); return appendIndex++; } diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 849f6fb67b..be1505c4ef 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -30,7 +30,6 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; -import org.apache.paimon.types.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +42,7 @@ import java.util.stream.Collectors; public class PaimonJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonJniScanner.class); private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix."; + private final Map params; private final Map paimonOptionParams; private final String dbName; private final String tblName; @@ -61,6 +61,13 @@ public class PaimonJniScanner extends JniScanner { public PaimonJniScanner(int batchSize, Map params) { LOG.debug("params:{}", params); + this.params = params; + String[] requiredFields = params.get("required_fields").split(","); + String[] requiredTypes = params.get("columns_types").split("#"); + ColumnType[] columnTypes = new ColumnType[requiredTypes.length]; + for (int i = 0; i < requiredTypes.length; i++) { + columnTypes[i] = ColumnType.parseType(requiredFields[i], requiredTypes[i]); + } paimonSplit = params.get("paimon_split"); paimonPredicate = params.get("paimon_predicate"); dbName = params.get("db_name"); @@ -69,21 +76,17 @@ public class PaimonJniScanner extends JniScanner { dbId = Long.parseLong(params.get("db_id")); tblId = Long.parseLong(params.get("tbl_id")); lastUpdateTime = Long.parseLong(params.get("last_update_time")); - super.batchSize = batchSize; - super.fields = params.get("paimon_column_names").split(","); - super.predicates = new ScanPredicate[0]; + initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], batchSize); paimonOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) .collect(Collectors .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue())); - } @Override public void open() throws IOException { initTable(); initReader(); - parseRequiredTypes(); } private void initReader() throws IOException { @@ -99,30 +102,16 @@ public class PaimonJniScanner extends JniScanner { private List getPredicates() { List predicates = PaimonScannerUtils.decodeStringToObject(paimonPredicate); - LOG.info("predicates:{}", predicates); + LOG.debug("predicates:{}", predicates); return predicates; } private Split getSplit() { Split split = PaimonScannerUtils.decodeStringToObject(paimonSplit); - LOG.info("split:{}", split); + LOG.debug("split:{}", split); return split; } - private void parseRequiredTypes() { - ColumnType[] columnTypes = new ColumnType[fields.length]; - for (int i = 0; i < fields.length; i++) { - int index = paimonAllFieldNames.indexOf(fields[i]); - if (index == -1) { - throw new RuntimeException(String.format("Cannot find field %s in schema %s", - fields[i], paimonAllFieldNames)); - } - DataType dataType = table.rowType().getTypeAt(index); - columnTypes[i] = PaimonTypeUtils.fromPaimonType(fields[i], dataType); - } - super.types = columnTypes; - } - @Override public void close() throws IOException { if (reader != null) { @@ -154,9 +143,12 @@ public class PaimonJniScanner extends JniScanner { recordIterator.releaseBatch(); recordIterator = reader.readBatch(); } - } catch (IOException e) { - LOG.warn("failed to getNext columnValue ", e); - throw new RuntimeException(e); + } catch (Exception e) { + close(); + LOG.warn("Failed to get the next batch of paimon. " + + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}", + getSplit(), params.get("required_fields"), paimonAllFieldNames, e); + throw new IOException(e); } return rows; } @@ -178,7 +170,7 @@ public class PaimonJniScanner extends JniScanner { } this.table = tableExt.getTable(); paimonAllFieldNames = PaimonScannerUtils.fieldNames(this.table.rowType()); - LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames); + LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); } }