[opt](routine load) support routine load perceived schema change (#39412) (#40508)

pick #39412

At present, if the table structure changes, the routine load cannot
perceive it. As a long-running load, it should be able to perceive the
changes in the table structure.
This commit is contained in:
hui lai
2024-09-10 11:05:58 +08:00
committed by GitHub
parent 185353e890
commit 8eda15ae16
6 changed files with 170 additions and 21 deletions

View File

@ -166,7 +166,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
public void prepare() throws UserException {
super.prepare();
// should reset converted properties each time the job being prepared.
// because the file info can be changed anytime.
convertCustomProperties(true);

View File

@ -19,10 +19,12 @@ package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TKafkaLoadInfo;
@ -131,7 +133,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private TExecPlanFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws UserException {
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
// plan for each task, in case table has change(rollup or schema change)
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId);
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId());
StreamLoadPlanner planner = new StreamLoadPlanner(db,
(OlapTable) db.getTableOrMetaException(routineLoadJob.getTableId(),
Table.TableType.OLAP), routineLoadJob);
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(planner, loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it needs update timeout to make task timeout backoff work
@ -164,7 +170,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob) throws UserException {
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
// plan for each task, in case table has change(rollup or schema change)
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId);
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(routineLoadJob.getDbId());
StreamLoadPlanner planner = new StreamLoadPlanner(db,
(OlapTable) db.getTableOrMetaException(routineLoadJob.getTableId(),
Table.TableType.OLAP), routineLoadJob);
TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(planner, loadId, txnId);
TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
// it needs update timeout to make task timeout backoff work

View File

@ -240,9 +240,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// The tasks belong to this job
protected List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList();
// stream load planer will be initialized during job schedule
protected StreamLoadPlanner planner;
// this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
// because we can not serialize the Expressions contained in job.
protected OriginStatement origStmt;
@ -911,21 +908,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// call before first scheduling
// derived class can override this.
public void prepare() throws UserException {
initPlanner();
}
public abstract void prepare() throws UserException;
private void initPlanner() throws UserException {
// for multi table load job, the table name is dynamic,we will set table when task scheduling.
if (isMultiTable) {
return;
}
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
planner = new StreamLoadPlanner(db,
(OlapTable) db.getTableOrMetaException(this.tableId, Table.TableType.OLAP), this);
}
public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserException {
public TExecPlanFragmentParams plan(StreamLoadPlanner planner, TUniqueId loadId, long txnId) throws UserException {
Preconditions.checkNotNull(planner);
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP);
@ -948,7 +933,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
}
public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) throws UserException {
public TPipelineFragmentParams planForPipeline(StreamLoadPlanner planner, TUniqueId loadId, long txnId)
throws UserException {
Preconditions.checkNotNull(planner);
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP);