[fix](move-memtable) disable move memtable when light schema change is false (#29362)

This commit is contained in:
HHoflittlefish777
2024-01-04 23:03:35 +08:00
committed by GitHub
parent 99edbaf3cf
commit 9aafcb18bd
6 changed files with 353 additions and 5 deletions

View File

@ -209,13 +209,15 @@ public class BrokerLoadJob extends BulkLoadJob {
List<BrokerFileGroup> brokerFileGroups = entry.getValue();
long tableId = aggKey.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
boolean isEnableMemtableOnSinkNode = ((OlapTable) table).getTableProperty().getUseSchemaLightChange()
? this.enableMemTableOnSinkNode : false;
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(),
useNewLoadScanNode(), getPriority(), enableMemTableOnSinkNode);
useNewLoadScanNode(), getPriority(), isEnableMemtableOnSinkNode);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());

View File

@ -136,6 +136,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
PhysicalOlapTableSink<?> physicalOlapTableSink;
DataSink sink;
InsertExecutor insertExecutor;
Table targetTable;
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
// should lock target table until we begin transaction.
targetTableIf.readLock();
@ -159,7 +160,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
physicalOlapTableSink = plan.get();
Table targetTable = physicalOlapTableSink.getTargetTable();
targetTable = physicalOlapTableSink.getTargetTable();
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(),
@ -187,6 +188,10 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
targetTableIf.readUnlock();
}
boolean isEnableMemtableOnSinkNode =
((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode() : false;
insertExecutor.getCoordinator().getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for statement interruption
// so we need to set this here

View File

@ -319,8 +319,10 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
boolean isEnableMemtableOnSinkNode =
destTable.getTableProperty().getUseSchemaLightChange()
? taskInfo.isMemtableOnSinkNode() : false;
queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
@ -542,7 +544,10 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
boolean isEnableMemtableOnSinkNode =
destTable.getTableProperty().getUseSchemaLightChange()
? taskInfo.isMemtableOnSinkNode() : false;
queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
pipParams.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();

View File

@ -408,6 +408,10 @@ public class Coordinator implements CoordInterface {
return scanRangeNum;
}
public TQueryOptions getQueryOptions() {
return this.queryOptions;
}
public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
}

View File

@ -79,6 +79,7 @@ import org.apache.doris.analysis.UseStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
@ -1996,6 +1997,13 @@ public class StmtExecutor {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
Table table = insertStmt.getTargetTable();
if (table instanceof OlapTable) {
boolean isEnableMemtableOnSinkNode =
((OlapTable) table).getTableProperty().getUseSchemaLightChange()
? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
}
coord.exec();
int execTimeout = context.getExecTimeout();
LOG.debug("Insert {} execution timeout:{}", DebugUtil.printId(context.queryId()), execTimeout);