[feature-wip](partial update) PART1: support basic partial write (#17542)

This commit is contained in:
yixiutt
2023-04-28 17:17:57 +08:00
committed by GitHub
parent 718297d3c1
commit aef9355cd3
65 changed files with 31529 additions and 176 deletions

View File

@ -542,7 +542,8 @@ public class Load {
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction) throws UserException {
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false, false);
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false, false,
false);
}
/*
@ -552,11 +553,12 @@ public class Load {
public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction, Map<String, Expr> exprsByName,
Analyzer analyzer, TupleDescriptor srcTupleDesc, Map<String, SlotDescriptor> slotDescByName,
List<Integer> srcSlotIds, TFileFormatType formatType, List<String> hiddenColumns, boolean useVectorizedLoad)
List<Integer> srcSlotIds, TFileFormatType formatType, List<String> hiddenColumns, boolean useVectorizedLoad,
boolean isPartialUpdate)
throws UserException {
rewriteColumns(columnDescs);
initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName,
srcSlotIds, formatType, hiddenColumns, useVectorizedLoad, true);
srcSlotIds, formatType, hiddenColumns, useVectorizedLoad, true, isPartialUpdate);
}
/*
@ -571,7 +573,7 @@ public class Load {
Map<String, Pair<String, List<String>>> columnToHadoopFunction, Map<String, Expr> exprsByName,
Analyzer analyzer, TupleDescriptor srcTupleDesc, Map<String, SlotDescriptor> slotDescByName,
List<Integer> srcSlotIds, TFileFormatType formatType, List<String> hiddenColumns, boolean useVectorizedLoad,
boolean needInitSlotAndAnalyzeExprs) throws UserException {
boolean needInitSlotAndAnalyzeExprs, boolean isPartialUpdate) throws UserException {
// We make a copy of the columnExprs so that our subsequent changes
// to the columnExprs will not affect the original columnExprs.
// skip the mapping columns not exist in schema
@ -635,9 +637,10 @@ public class Load {
if (columnExprMap.containsKey(columnName)) {
continue;
}
if (column.getDefaultValue() != null || column.isAllowNull()) {
if (column.getDefaultValue() != null || column.isAllowNull() || isPartialUpdate) {
continue;
}
//continue;
throw new DdlException("Column has no default value. column: " + columnName);
}

View File

@ -587,6 +587,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return null;
}
@Override
public boolean isPartialUpdate() {
return false;
}
@Override
public ImportColumnDescs getColumnExprDescs() {
if (columnDescs == null) {

View File

@ -109,9 +109,9 @@ public class FileLoadScanNode extends FileScanNode {
// Only for stream load/routine load job.
public void setLoadInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc,
BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode,
TFileType fileType, List<String> hiddenColumns) {
TFileType fileType, List<String> hiddenColumns, boolean isPartialUpdate) {
FileGroupInfo fileGroupInfo = new FileGroupInfo(loadId, txnId, targetTable, brokerDesc,
fileGroup, fileStatus, strictMode, fileType, hiddenColumns);
fileGroup, fileStatus, strictMode, fileType, hiddenColumns, isPartialUpdate);
fileGroupInfos.add(fileGroupInfo);
}

View File

@ -77,6 +77,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -90,6 +91,9 @@ public class OlapTableSink extends DataSink {
private TupleDescriptor tupleDescriptor;
// specified partition ids.
private List<Long> partitionIds;
// partial update input columns
private boolean isPartialUpdate = false;
private HashSet<String> partialUpdateInputColumns;
// set after init called
private TDataSink tDataSink;
@ -140,6 +144,11 @@ public class OlapTableSink extends DataSink {
}
}
public void setPartialUpdateInputColumns(boolean isPartialUpdate, HashSet<String> columns) {
this.isPartialUpdate = isPartialUpdate;
this.partialUpdateInputColumns = columns;
}
public void updateLoadId(TUniqueId newLoadId) {
tDataSink.getOlapTableSink().setLoadId(newLoadId);
}
@ -231,6 +240,12 @@ public class OlapTableSink extends DataSink {
indexSchema.setIndexesDesc(indexDesc);
schemaParam.addToIndexes(indexSchema);
}
schemaParam.setIsPartialUpdate(isPartialUpdate);
if (isPartialUpdate) {
for (String s : partialUpdateInputColumns) {
schemaParam.addToPartialUpdateInputColumns(s);
}
}
return schemaParam;
}

View File

@ -70,6 +70,7 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -137,8 +138,37 @@ public class StreamLoadPlanner {
// construct tuple descriptor, used for scanNode
scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
boolean negative = taskInfo.getNegative();
// get partial update related info
boolean isPartialUpdate = taskInfo.isPartialUpdate();
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support partial update");
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
boolean existInExpr = false;
for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
if (importColumnDesc.getColumnName() != null
&& importColumnDesc.getColumnName().equals(col.getName())) {
if (!col.isVisible()) {
throw new UserException("Partial update should not include invisible column: "
+ col.getName());
}
partialUpdateInputColumns.add(col.getName());
existInExpr = true;
break;
}
}
if (col.isKey() && !existInExpr) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
}
}
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {
continue;
}
SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
@ -201,7 +231,8 @@ public class StreamLoadPlanner {
}
// The load id will pass to csv reader to find the stream load context from new load stream manager
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns());
fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(),
taskInfo.isPartialUpdate());
scanNode = fileScanNode;
scanNode.init(analyzer);
@ -222,6 +253,7 @@ public class StreamLoadPlanner {
Config.enable_single_replica_load);
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete();
// for stream load, we only need one fragment, ScanNode -> DataSink.

View File

@ -85,6 +85,7 @@ public class FileGroupInfo {
// used for stream load, FILE_LOCAL or FILE_STREAM
private TFileType fileType;
private List<String> hiddenColumns = null;
private boolean isPartialUpdate = false;
// for broker load
public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc,
@ -106,7 +107,7 @@ public class FileGroupInfo {
// for stream load
public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc,
BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode,
TFileType fileType, List<String> hiddenColumns) {
TFileType fileType, List<String> hiddenColumns, boolean isPartialUpdate) {
this.jobType = JobType.STREAM_LOAD;
this.loadId = loadId;
this.txnId = txnId;
@ -119,6 +120,7 @@ public class FileGroupInfo {
this.strictMode = strictMode;
this.fileType = fileType;
this.hiddenColumns = hiddenColumns;
this.isPartialUpdate = isPartialUpdate;
}
public Table getTargetTable() {
@ -159,6 +161,10 @@ public class FileGroupInfo {
return hiddenColumns;
}
public boolean isPartialUpdate() {
return isPartialUpdate;
}
public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throws UserException {
if (filesAdded == 0) {
throw new UserException("No source file in this table(" + targetTable.getName() + ").");

View File

@ -207,7 +207,7 @@ public class LoadScanProvider implements FileScanProviderIf {
Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(),
context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds,
formatType(context.fileGroup.getFileFormat(), ""), fileGroupInfo.getHiddenColumns(),
VectorizedUtil.isVectorized());
VectorizedUtil.isVectorized(), fileGroupInfo.isPartialUpdate());
int columnCountFromPath = 0;
if (context.fileGroup.getColumnNamesFromPath() != null) {

View File

@ -97,6 +97,8 @@ public interface LoadTaskInfo {
List<String> getHiddenColumns();
boolean isPartialUpdate();
default boolean getTrimDoubleQuotes() {
return false;
}

View File

@ -83,6 +83,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private String headerType = "";
private List<String> hiddenColumns;
private boolean trimDoubleQuotes = false;
private boolean isPartialUpdate = false;
private int skipLines = 0;
private boolean enableProfile = false;
@ -268,6 +270,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return enableProfile;
}
@Override
public boolean isPartialUpdate() {
return isPartialUpdate;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@ -376,6 +383,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetEnableProfile()) {
enableProfile = request.isEnableProfile();
}
if (request.isSetPartialUpdate()) {
isPartialUpdate = request.isPartialUpdate();
}
}
// used for stream load