[fix](jni) remove 'push_down_predicates' and fix BE crash with decimal predicate (#32253) (#32599)

This commit is contained in:
Mingyu Chen
2024-03-21 13:48:51 +08:00
committed by yiguolei
parent a7fa9f290c
commit 4c8aaa156a
8 changed files with 29 additions and 52 deletions

View File

@ -102,13 +102,16 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
Status JniConnector::init(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_generate_predicates(colname_to_value_range);
if (_predicates_length != 0 && _predicates != nullptr) {
int64_t predicates_address = (int64_t)_predicates.get();
// We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
// serialized predicates in java side.
_scanner_params.emplace("push_down_predicates", std::to_string(predicates_address));
}
// TODO: This logic need to be changed.
// See the comment of "predicates" field in JniScanner.java
// _generate_predicates(colname_to_value_range);
// if (_predicates_length != 0 && _predicates != nullptr) {
// int64_t predicates_address = (int64_t)_predicates.get();
// // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
// // serialized predicates in java side.
// _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address));
// }
return Status::OK();
}

View File

@ -164,6 +164,9 @@ public:
char_ptr += s->size;
}
} else {
// FIXME: it can not handle decimal type correctly.
// but this logic is deprecated and not used.
// so may be deleted or fixed later.
for (const CppType* v : values) {
int type_len = sizeof(CppType);
*reinterpret_cast<int*>(char_ptr) = type_len;

View File

@ -19,7 +19,6 @@ package org.apache.doris.avro;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.thrift.TFileType;
@ -173,7 +172,7 @@ public class AvroJNIScanner extends JniScanner {
try {
initAvroFileContext();
initFieldInspector();
initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize);
initTableInfo(requiredTypes, requiredFields, fetchSize);
} catch (Exception e) {
LOG.warn("Failed to init avro scanner. ", e);
throw new RuntimeException(e);

View File

@ -20,7 +20,6 @@ package org.apache.doris.hudi;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
@ -59,7 +58,6 @@ public class HudiJniScanner extends JniScanner {
private final int fetchSize;
private final String debugString;
private final HoodieSplit split;
private final ScanPredicate[] predicates;
private final ClassLoader classLoader;
private long getRecordReaderTimeNs = 0;
@ -123,20 +121,8 @@ public class HudiJniScanner extends JniScanner {
.collect(Collectors.joining("\n"));
try {
this.classLoader = this.getClass().getClassLoader();
String predicatesAddressString = params.remove("push_down_predicates");
this.fetchSize = fetchSize;
this.split = new HoodieSplit(params);
if (predicatesAddressString == null) {
predicates = new ScanPredicate[0];
} else {
long predicatesAddress = Long.parseLong(predicatesAddressString);
if (predicatesAddress != 0) {
predicates = ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
LOG.info("HudiJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
} else {
predicates = new ScanPredicate[0];
}
}
} catch (Exception e) {
LOG.error("Failed to initialize hudi scanner, split params:\n" + debugString, e);
throw e;
@ -147,7 +133,7 @@ public class HudiJniScanner extends JniScanner {
public void open() throws IOException {
Future<?> avroFuture = avroReadPool.submit(() -> {
Thread.currentThread().setContextClassLoader(classLoader);
initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize);
initTableInfo(split.requiredTypes(), split.requiredFields(), fetchSize);
long startTime = System.nanoTime();
// RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck,
// so use another process to kill this stuck process.

View File

@ -33,6 +33,14 @@ public abstract class JniScanner {
protected VectorTable vectorTable;
protected String[] fields;
protected ColumnType[] types;
@Deprecated
// This predicate is from BE, but no used.
// TODO: actually, we can generate the predicate for JNI scanner in FE's planner,
// then serialize it to BE, and BE pass it to JNI scanner directly.
// NO need to use this intermediate expression, because each JNI scanner has its
// own predicate expression format.
// For example, Paimon use "PaimonScannerUtils.decodeStringToObject(paimonPredicate)"
// to deserialize the predicate string to PaimonPredicate object.
protected ScanPredicate[] predicates;
protected int batchSize;
@ -50,11 +58,9 @@ public abstract class JniScanner {
throw new UnsupportedOperationException();
}
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
int batchSize) {
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) {
this.types = requiredTypes;
this.fields = requiredFields;
this.predicates = predicates;
this.batchSize = batchSize;
}

View File

@ -20,7 +20,6 @@ package org.apache.doris.common.jni;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.log4j.Logger;
@ -187,15 +186,7 @@ public class MockJniScanner extends JniScanner {
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
}
ScanPredicate[] predicates = new ScanPredicate[0];
if (params.containsKey("push_down_predicates")) {
long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
if (predicatesAddress != 0) {
predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
LOG.info("MockJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
}
}
initTableInfo(columnTypes, requiredFields, predicates, batchSize);
initTableInfo(columnTypes, requiredFields, batchSize);
}
@Override

View File

@ -19,7 +19,6 @@ package org.apache.doris.maxcompute;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
@ -99,15 +98,7 @@ public class MaxComputeJniScanner extends JniScanner {
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
}
ScanPredicate[] predicates = new ScanPredicate[0];
if (params.containsKey("push_down_predicates")) {
long predicatesAddress = Long.parseLong(params.get("push_down_predicates"));
if (predicatesAddress != 0) {
predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
LOG.info("MaxComputeJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates));
}
}
initTableInfo(columnTypes, requiredFields, predicates, batchSize);
initTableInfo(columnTypes, requiredFields, batchSize);
}
public void refreshTableScan() {
@ -133,9 +124,8 @@ public class MaxComputeJniScanner extends JniScanner {
}
@Override
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
int batchSize) {
super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize);
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, int batchSize) {
super.initTableInfo(requiredTypes, requiredFields, batchSize);
readColumns = new ArrayList<>();
readColumnsToId = new HashMap<>();
for (int i = 0; i < fields.length; i++) {

View File

@ -19,7 +19,6 @@ package org.apache.doris.paimon;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;
@ -82,7 +81,7 @@ public class PaimonJniScanner extends JniScanner {
dbId = Long.parseLong(params.get("db_id"));
tblId = Long.parseLong(params.get("tbl_id"));
lastUpdateTime = Long.parseLong(params.get("last_update_time"));
initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], batchSize);
initTableInfo(columnTypes, requiredFields, batchSize);
paimonOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
.collect(Collectors