bp: #44274
This commit is contained in:
@ -40,7 +40,8 @@ const std::string PaimonJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
|
||||
|
||||
PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
|
||||
RuntimeState* state, RuntimeProfile* profile,
|
||||
const TFileRangeDesc& range)
|
||||
const TFileRangeDesc& range,
|
||||
const TFileScanRangeParams* range_params)
|
||||
: JniReader(file_slot_descs, state, profile) {
|
||||
std::vector<std::string> column_names;
|
||||
std::vector<std::string> column_types;
|
||||
@ -61,8 +62,8 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
|
||||
std::to_string(range.table_format_params.paimon_params.last_update_time);
|
||||
params["required_fields"] = join(column_names, ",");
|
||||
params["columns_types"] = join(column_types, "#");
|
||||
if (range.table_format_params.paimon_params.__isset.paimon_table) {
|
||||
params["paimon_table"] = range.table_format_params.paimon_params.paimon_table;
|
||||
if (range_params->__isset.serialized_table) {
|
||||
params["serialized_table"] = range_params->serialized_table;
|
||||
}
|
||||
|
||||
// Used to create paimon option
|
||||
|
||||
@ -53,7 +53,8 @@ public:
|
||||
static const std::string PAIMON_OPTION_PREFIX;
|
||||
static const std::string HADOOP_OPTION_PREFIX;
|
||||
PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
|
||||
RuntimeProfile* profile, const TFileRangeDesc& range);
|
||||
RuntimeProfile* profile, const TFileRangeDesc& range,
|
||||
const TFileScanRangeParams* range_params);
|
||||
|
||||
~PaimonJniReader() override = default;
|
||||
|
||||
|
||||
@ -812,8 +812,8 @@ Status VFileScanner::_get_next_reader() {
|
||||
_cur_reader = std::move(mc_reader);
|
||||
} else if (range.__isset.table_format_params &&
|
||||
range.table_format_params.table_format_type == "paimon") {
|
||||
_cur_reader =
|
||||
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range);
|
||||
_cur_reader = PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
|
||||
range, _params);
|
||||
init_status = ((PaimonJniReader*)(_cur_reader.get()))
|
||||
->init_reader(_colname_to_value_range);
|
||||
} else if (range.__isset.table_format_params &&
|
||||
|
||||
@ -224,8 +224,8 @@ public class PaimonJniScanner extends JniScanner {
|
||||
}
|
||||
|
||||
private void initTable() {
|
||||
if (params.containsKey("paimon_table")) {
|
||||
table = PaimonUtils.deserialize(params.get("paimon_table"));
|
||||
if (params.containsKey("serialized_table")) {
|
||||
table = PaimonUtils.deserialize(params.get("serialized_table"));
|
||||
} else {
|
||||
PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
|
||||
paimonOptionParams, hadoopOptionParams, dbName, tblName);
|
||||
|
||||
@ -76,6 +76,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* FileQueryScanNode for querying the file access type of catalog, now only support
|
||||
@ -261,6 +262,11 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
|
||||
}
|
||||
|
||||
// Serialize the table to be scanned to BE's jni reader
|
||||
protected Optional<String> getSerializedTable() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createScanRangeLocations() throws UserException {
|
||||
long start = System.currentTimeMillis();
|
||||
@ -366,6 +372,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
getSerializedTable().ifPresent(params::setSerializedTable);
|
||||
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
|
||||
}
|
||||
|
||||
@ -102,6 +102,7 @@ public class PaimonScanNode extends FileQueryScanNode {
|
||||
private int paimonSplitNum = 0;
|
||||
private List<SplitStat> splitStats = new ArrayList<>();
|
||||
private SessionVariable sessionVariable;
|
||||
private String serializedTable;
|
||||
|
||||
public PaimonScanNode(PlanNodeId id,
|
||||
TupleDescriptor desc,
|
||||
@ -115,6 +116,7 @@ public class PaimonScanNode extends FileQueryScanNode {
|
||||
protected void doInitialize() throws UserException {
|
||||
super.doInitialize();
|
||||
source = new PaimonSource(desc);
|
||||
serializedTable = encodeObjectToString(source.getPaimonTable());
|
||||
Preconditions.checkNotNull(source);
|
||||
}
|
||||
|
||||
@ -144,6 +146,11 @@ public class PaimonScanNode extends FileQueryScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Optional<String> getSerializedTable() {
|
||||
return Optional.of(serializedTable);
|
||||
}
|
||||
|
||||
private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
|
||||
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
|
||||
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
|
||||
|
||||
@ -330,7 +330,7 @@ struct TPaimonFileDesc {
|
||||
11: optional string file_format
|
||||
12: optional TPaimonDeletionFileDesc deletion_file;
|
||||
13: optional map<string, string> hadoop_conf // deprecated
|
||||
14: optional string paimon_table
|
||||
14: optional string paimon_table // deprecated
|
||||
}
|
||||
|
||||
struct TMaxComputeFileDesc {
|
||||
@ -420,6 +420,13 @@ struct TFileScanRangeParams {
|
||||
20: optional list<Exprs.TExpr> pre_filter_exprs_list
|
||||
21: optional Types.TUniqueId load_id
|
||||
22: optional TTextSerdeType text_serde_type
|
||||
// used by flexible partial update
|
||||
23: optional string sequence_map_col
|
||||
// table from FE, used for jni scanner
|
||||
// BE can use table director:
|
||||
// 1. Reduce the access to HMS and HDFS on the JNI side.
|
||||
// 2. There will be no inconsistency between the fe and be tables.
|
||||
24: optional string serialized_table
|
||||
}
|
||||
|
||||
struct TFileRangeDesc {
|
||||
|
||||
Reference in New Issue
Block a user