diff --git a/fe/pom.xml b/fe/pom.xml
index d7227a76cd..874e427319 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -517,6 +517,12 @@ under the License.
1.7.5
+
+ org.apache.kafka
+ kafka-clients
+ 2.0.0
+
+
diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup
index 1e4db913fe..41ca7d8812 100644
--- a/fe/src/main/cup/sql_parser.cup
+++ b/fe/src/main/cup/sql_parser.cup
@@ -969,7 +969,7 @@ help_stmt ::=
// Export statement
export_stmt ::=
- // KW_EXPORT KW_TABLE table_name:tblName opt_using_partition:partitions
+ // KW_EXPORT KW_TABLE table_name:tblName opt_using_partition:partitions
KW_EXPORT KW_TABLE base_table_ref:tblRef
KW_TO STRING_LITERAL:path
opt_properties:properties
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 4d445cc68d..5b57002dc1 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -135,6 +135,7 @@ import org.apache.doris.load.LoadChecker;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
+import org.apache.doris.load.routineload.RoutineLoad;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.metric.MetricRepo;
@@ -240,6 +241,7 @@ public class Catalog {
private ConcurrentHashMap nameToCluster;
private Load load;
+ private RoutineLoad routineLoad;
private ExportMgr exportMgr;
private Clone clone;
private Alter alter;
@@ -311,7 +313,7 @@ public class Catalog {
private PullLoadJobMgr pullLoadJobMgr;
private BrokerMgr brokerMgr;
-
+
private GlobalTransactionMgr globalTransactionMgr;
private DeployManager deployManager;
@@ -370,6 +372,7 @@ public class Catalog {
this.idToDb = new ConcurrentHashMap<>();
this.fullNameToDb = new ConcurrentHashMap<>();
this.load = new Load();
+ this.routineLoad = new RoutineLoad();
this.exportMgr = new ExportMgr();
this.clone = new Clone();
this.alter = new Alter();
@@ -419,7 +422,7 @@ public class Catalog {
this.auth = new PaloAuth();
this.domainResolver = new DomainResolver(auth);
-
+
this.esStateStore = new EsStateStore();
}
@@ -457,11 +460,11 @@ public class Catalog {
public BrokerMgr getBrokerMgr() {
return brokerMgr;
}
-
+
public static GlobalTransactionMgr getCurrentGlobalTransactionMgr() {
return getCurrentCatalog().globalTransactionMgr;
}
-
+
public GlobalTransactionMgr getGlobalTransactionMgr() {
return globalTransactionMgr;
}
@@ -572,20 +575,20 @@ public class Catalog {
// 5. create es state store
esStateStore.loadTableFromCatalog();
esStateStore.start();
-
+
// 6. start state listener thread
createStateListener();
listener.setName("stateListener");
listener.setInterval(STATE_CHANGE_CHECK_INTERVAL_MS);
listener.start();
-
+
// 7. start txn cleaner thread
createTxnCleaner();
txnCleaner.setName("txnCleaner");
// the clear threads runs every min(transaction_clean_interval_second,stream_load_default_timeout_second)/10
- txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
+ txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
Config.stream_load_default_timeout_second) * 100L);
-
+
}
private void getClusterIdAndRole() throws IOException {
@@ -615,7 +618,7 @@ public class Catalog {
if (!roleFile.exists()) {
// The very first time to start the first node of the cluster.
// It should became a Master node (Master node's role is also FOLLOWER, which means electable)
-
+
// For compatibility. Because this is the very first time to start, so we arbitrarily choose
// a new name for this node
role = FrontendNodeType.FOLLOWER;
@@ -673,7 +676,7 @@ public class Catalog {
} else {
// try to get role and node name from helper node,
// this loop will not end until we get certain role type and name
- while(true) {
+ while (true) {
if (!getFeNodeTypeAndNameFromHelpers()) {
LOG.warn("current node is not added to the group. please add it first. "
+ "sleep 5 seconds and retry, current helper nodes: {}", helperNodes);
@@ -700,7 +703,7 @@ public class Catalog {
Pair rightHelperNode = helperNodes.get(0);
Storage storage = new Storage(IMAGE_DIR);
- if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
+ if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
|| !roleFile.exists()) {
storage.writeFrontendRoleAndNodeName(role, nodeName);
}
@@ -773,7 +776,7 @@ public class Catalog {
Preconditions.checkState(helperNodes.size() == 1);
LOG.info("finished to get cluster id: {}, role: {} and node name: {}",
- clusterId, role.name(), nodeName);
+ clusterId, role.name(), nodeName);
}
public static String genFeNodeName(String host, int port, boolean isOldStyle) {
@@ -799,7 +802,7 @@ public class Catalog {
conn = (HttpURLConnection) url.openConnection();
if (conn.getResponseCode() != 200) {
LOG.warn("failed to get fe node type from helper node: {}. response code: {}",
- helperNode, conn.getResponseCode());
+ helperNode, conn.getResponseCode());
continue;
}
@@ -1002,10 +1005,10 @@ public class Catalog {
// Clone checker
CloneChecker.getInstance().setInterval(Config.clone_checker_interval_second * 1000L);
CloneChecker.getInstance().start();
-
+
// Publish Version Daemon
publishVersionDaemon.start();
-
+
// Start txn cleaner
txnCleaner.start();
@@ -1234,7 +1237,7 @@ public class Catalog {
long loadImageEndTime = System.currentTimeMillis();
LOG.info("finished load image in " + (loadImageEndTime - loadImageStartTime) + " ms");
}
-
+
private void recreateTabletInvertIndex() {
if (isCheckpointThread()) {
return;
@@ -1390,7 +1393,7 @@ public class Catalog {
param.readFields(dis);
load.setLoadErrorHubInfo(param);
}
-
+
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) {
// 4. load delete jobs
int deleteJobSize = dis.readInt();
@@ -1398,14 +1401,14 @@ public class Catalog {
for (int i = 0; i < deleteJobSize; i++) {
long dbId = dis.readLong();
newChecksum ^= dbId;
-
+
int deleteJobCount = dis.readInt();
newChecksum ^= deleteJobCount;
for (int j = 0; j < deleteJobCount; j++) {
LoadJob job = new LoadJob();
job.readFields(dis);
long currentTimeMs = System.currentTimeMillis();
-
+
// Delete the history load jobs that are older than
// LABEL_KEEP_MAX_MS
// This job must be FINISHED or CANCELLED
@@ -1478,7 +1481,7 @@ public class Catalog {
// init job
Database db = getDb(job.getDbId());
// should check job state here because the job is finished but not removed from alter jobs list
- if (db != null && (job.getState() == org.apache.doris.alter.AlterJob.JobState.PENDING
+ if (db != null && (job.getState() == org.apache.doris.alter.AlterJob.JobState.PENDING
|| job.getState() == org.apache.doris.alter.AlterJob.JobState.RUNNING)) {
job.replayInitJob(db);
}
@@ -1532,7 +1535,7 @@ public class Catalog {
@Deprecated
private long loadBackupAndRestoreJob_D(DataInputStream dis, long checksum,
- Class extends AbstractBackupJob_D> jobClass) throws IOException {
+ Class extends AbstractBackupJob_D> jobClass) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
for (int i = 0; i < size; i++) {
@@ -1592,14 +1595,14 @@ public class Catalog {
long newChecksum = checksum ^ size;
UserPropertyMgr tmpUserPropertyMgr = new UserPropertyMgr();
tmpUserPropertyMgr.readFields(dis);
-
+
// transform it. the old UserPropertyMgr is deprecated
tmpUserPropertyMgr.transform(auth);
return newChecksum;
}
return checksum;
}
-
+
public long loadTransactionState(DataInputStream dis, long checksum) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
@@ -1669,7 +1672,7 @@ public class Catalog {
long saveImageEndTime = System.currentTimeMillis();
LOG.info("finished save image {} in {} ms. checksum is {}",
- curFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
+ curFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
}
public long saveHeader(DataOutputStream dos, long replayedJournalId, long checksum) throws IOException {
@@ -1785,7 +1788,7 @@ public class Catalog {
// 3. load error hub info
LoadErrorHub.Param param = load.getLoadErrorHubInfo();
param.write(dos);
-
+
// 4. save delete load job info
Map> dbToDeleteJobs = load.getDbToDeleteJobs();
int deleteJobSize = dbToDeleteJobs.size();
@@ -1873,7 +1876,7 @@ public class Catalog {
auth.write(dos);
return checksum;
}
-
+
public long saveTransactionState(DataOutputStream dos, long checksum) throws IOException {
int size = globalTransactionMgr.getTransactionNum();
checksum ^= size;
@@ -1914,7 +1917,7 @@ public class Catalog {
}
};
}
-
+
public void createTxnCleaner() {
txnCleaner = new Daemon() {
protected void runOneCycle() {
@@ -2008,85 +2011,85 @@ public class Catalog {
if (formerFeType == FrontendNodeType.INIT) {
switch (feType) {
- case MASTER: {
- try {
- transferToMaster();
- } catch (IOException e) {
- e.printStackTrace();
+ case MASTER: {
+ try {
+ transferToMaster();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ break;
}
- break;
- }
- case UNKNOWN:
- case FOLLOWER:
- case OBSERVER: {
- transferToNonMaster();
- break;
- }
- default:
- break;
+ case UNKNOWN:
+ case FOLLOWER:
+ case OBSERVER: {
+ transferToNonMaster();
+ break;
+ }
+ default:
+ break;
}
return;
}
if (formerFeType == FrontendNodeType.UNKNOWN) {
switch (feType) {
- case MASTER: {
- try {
- transferToMaster();
- } catch (IOException e) {
- e.printStackTrace();
+ case MASTER: {
+ try {
+ transferToMaster();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ break;
}
- break;
- }
- case FOLLOWER:
- case OBSERVER: {
- transferToNonMaster();
- break;
- }
- default:
+ case FOLLOWER:
+ case OBSERVER: {
+ transferToNonMaster();
+ break;
+ }
+ default:
}
return;
}
if (formerFeType == FrontendNodeType.FOLLOWER) {
switch (feType) {
- case MASTER: {
- try {
- transferToMaster();
- } catch (IOException e) {
- e.printStackTrace();
+ case MASTER: {
+ try {
+ transferToMaster();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ break;
}
- break;
- }
- case UNKNOWN:
- case OBSERVER: {
- transferToNonMaster();
- break;
- }
- default:
+ case UNKNOWN:
+ case OBSERVER: {
+ transferToNonMaster();
+ break;
+ }
+ default:
}
return;
}
if (formerFeType == FrontendNodeType.OBSERVER) {
switch (feType) {
- case UNKNOWN: {
- transferToNonMaster();
+ case UNKNOWN: {
+ transferToNonMaster();
break;
- }
- default:
+ }
+ default:
}
return;
}
if (formerFeType == FrontendNodeType.MASTER) {
switch (feType) {
- case UNKNOWN:
- case FOLLOWER:
- case OBSERVER: {
- System.exit(-1);
- }
- default:
+ case UNKNOWN:
+ case FOLLOWER:
+ case OBSERVER: {
+ System.exit(-1);
+ }
+ default:
}
return;
}
@@ -2650,7 +2653,7 @@ public class Catalog {
}
public Pair addPartition(Database db, String tableName, OlapTable givenTable,
- AddPartitionClause addPartitionClause, boolean isRestore) throws DdlException {
+ AddPartitionClause addPartitionClause, boolean isRestore) throws DdlException {
SingleRangePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
@@ -2768,18 +2771,18 @@ public class Catalog {
try {
long partitionId = getNextId();
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
- olapTable.getId(),
- partitionId, partitionName,
- indexIdToShortKeyColumnCount,
- indexIdToSchemaHash,
- indexIdToStorageType,
- indexIdToSchema,
- olapTable.getKeysType(),
- distributionInfo,
- dataProperty.getStorageMedium(),
- singlePartitionDesc.getReplicationNum(),
- versionInfo, bfColumns, olapTable.getBfFpp(),
- tabletIdSet, isRestore);
+ olapTable.getId(),
+ partitionId, partitionName,
+ indexIdToShortKeyColumnCount,
+ indexIdToSchemaHash,
+ indexIdToStorageType,
+ indexIdToSchema,
+ olapTable.getKeysType(),
+ distributionInfo,
+ dataProperty.getStorageMedium(),
+ singlePartitionDesc.getReplicationNum(),
+ versionInfo, bfColumns, olapTable.getBfFpp(),
+ tabletIdSet, isRestore);
// check again
db.writeLock();
@@ -3125,13 +3128,13 @@ public class Catalog {
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
- partitionId, indexId, tabletId,
- shortKeyColumnCount, schemaHash,
- version, versionHash,
- keysType,
- storageType, storageMedium,
- schema, bfColumns, bfFpp,
- countDownLatch);
+ partitionId, indexId, tabletId,
+ shortKeyColumnCount, schemaHash,
+ version, versionHash,
+ keysType,
+ storageType, storageMedium,
+ schema, bfColumns, bfFpp,
+ countDownLatch);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
// not for resending task
@@ -3298,7 +3301,7 @@ public class Catalog {
DataProperty dataProperty = null;
try {
dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
- DataProperty.DEFAULT_HDD_DATA_PROPERTY);
+ DataProperty.DEFAULT_HDD_DATA_PROPERTY);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
@@ -3316,18 +3319,18 @@ public class Catalog {
// create partition
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
- olapTable.getId(),
- partitionId, partitionName,
- olapTable.getIndexIdToShortKeyColumnCount(),
- olapTable.getIndexIdToSchemaHash(),
- olapTable.getIndexIdToStorageType(),
- olapTable.getIndexIdToSchema(),
- keysType,
- distributionInfo,
- dataProperty.getStorageMedium(),
- replicationNum,
- versionInfo, bfColumns, bfFpp,
- tabletIdSet, isRestore);
+ olapTable.getId(),
+ partitionId, partitionName,
+ olapTable.getIndexIdToShortKeyColumnCount(),
+ olapTable.getIndexIdToSchemaHash(),
+ olapTable.getIndexIdToStorageType(),
+ olapTable.getIndexIdToSchema(),
+ keysType,
+ distributionInfo,
+ dataProperty.getStorageMedium(),
+ replicationNum,
+ versionInfo, bfColumns, bfFpp,
+ tabletIdSet, isRestore);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE) {
try {
@@ -3349,16 +3352,16 @@ public class Catalog {
for (Map.Entry entry : partitionNameToId.entrySet()) {
DataProperty dataProperty = rangePartitionInfo.getDataProperty(entry.getValue());
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
- entry.getValue(), entry.getKey(),
- olapTable.getIndexIdToShortKeyColumnCount(),
- olapTable.getIndexIdToSchemaHash(),
- olapTable.getIndexIdToStorageType(),
- olapTable.getIndexIdToSchema(),
- keysType, distributionInfo,
- dataProperty.getStorageMedium(),
- partitionInfo.getReplicationNum(entry.getValue()),
- versionInfo, bfColumns, bfFpp,
- tabletIdSet, isRestore);
+ entry.getValue(), entry.getKey(),
+ olapTable.getIndexIdToShortKeyColumnCount(),
+ olapTable.getIndexIdToSchemaHash(),
+ olapTable.getIndexIdToStorageType(),
+ olapTable.getIndexIdToSchema(),
+ keysType, distributionInfo,
+ dataProperty.getStorageMedium(),
+ partitionInfo.getReplicationNum(entry.getValue()),
+ versionInfo, bfColumns, bfFpp,
+ tabletIdSet, isRestore);
olapTable.addPartition(partition);
}
} else {
@@ -3409,14 +3412,14 @@ public class Catalog {
return returnTable;
}
-
+
private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
// create columns
List baseSchema = stmt.getColumns();
validateColumns(baseSchema);
-
+
// create partition info
PartitionDesc partitionDesc = stmt.getPartitionDesc();
PartitionInfo partitionInfo = null;
@@ -3456,7 +3459,7 @@ public class Catalog {
HashDistributionDesc hashDistri = (HashDistributionDesc) stmt.getDistributionDesc();
kuduCreateOpts.addHashPartitions(hashDistri.getDistributionColumnNames(), hashDistri.getBuckets());
KuduPartition hashPartition = KuduPartition.createHashPartition(hashDistri.getDistributionColumnNames(),
- hashDistri.getBuckets());
+ hashDistri.getBuckets());
// 3. partition, if exist
KuduPartition rangePartition = null;
@@ -3549,8 +3552,8 @@ public class Catalog {
}
public static void getDdlStmt(Table table, List createTableStmt, List addPartitionStmt,
- List createRollupStmt, boolean separatePartition, short replicationNum,
- boolean hidePassword) {
+ List createRollupStmt, boolean separatePartition, short replicationNum,
+ boolean hidePassword) {
StringBuilder sb = new StringBuilder();
// 1. create table
@@ -3565,7 +3568,7 @@ public class Catalog {
// 1.2 other table type
sb.append("CREATE ");
- if (table.getType() == TableType.KUDU || table.getType() == TableType.MYSQL
+ if (table.getType() == TableType.KUDU || table.getType() == TableType.MYSQL
|| table.getType() == TableType.ELASTICSEARCH) {
sb.append("EXTERNAL ");
}
@@ -3708,7 +3711,7 @@ public class Catalog {
sb.append(";");
} else if (table.getType() == TableType.ELASTICSEARCH) {
EsTable esTable = (EsTable) table;
-
+
// partition
PartitionInfo partitionInfo = esTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE) {
@@ -3724,7 +3727,7 @@ public class Catalog {
}
sb.append(")\n()");
}
-
+
// properties
sb.append("\nPROPERTIES (\n");
sb.append("\"host\" = \"").append(esTable.getHosts()).append("\",\n");
@@ -3823,8 +3826,8 @@ public class Catalog {
}
private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
- DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
- TabletMeta tabletMeta, Set tabletIdSet) throws DdlException {
+ DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
+ TabletMeta tabletMeta, Set tabletIdSet) throws DdlException {
Preconditions.checkArgument(replicationNum > 0);
DistributionInfoType distributionInfoType = distributionInfo.getType();
@@ -3865,7 +3868,7 @@ public class Catalog {
if (fullNameToDb.get(dbName) == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
-
+
Table table = null;
db.writeLock();
try {
@@ -3958,7 +3961,7 @@ public class Catalog {
}
public void handleJobsWhenDeleteReplica(long tableId, long partitionId, long indexId, long tabletId, long replicaId,
- long backendId) {
+ long backendId) {
// rollup
getRollupHandler().removeReplicaRelatedTask(tableId, partitionId, indexId, tabletId, backendId);
@@ -3977,10 +3980,10 @@ public class Catalog {
MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
Tablet tablet = materializedIndex.getTablet(info.getTabletId());
Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(),
- info.getVersionHash(), info.getDataSize(), info.getRowCount(), ReplicaState.NORMAL,
- info.getLastFailedVersion(),
+ info.getVersionHash(), info.getDataSize(), info.getRowCount(), ReplicaState.NORMAL,
+ info.getLastFailedVersion(),
info.getLastFailedVersionHash(),
- info.getLastSuccessVersion(),
+ info.getLastSuccessVersion(),
info.getLastSuccessVersionHash());
tablet.addReplica(replica);
}
@@ -4076,7 +4079,7 @@ public class Catalog {
String clusterName = ClusterNamespace.getClusterNameFromFullName(name);
return fullNameToDb.get(ClusterNamespace.getFullName(clusterName, dbName.toLowerCase()));
}
- }
+ }
return null;
}
@@ -4197,17 +4200,17 @@ public class Catalog {
&& dataProperty.getCooldownTimeMs() < currentTimeMs) {
// expire. change to HDD.
partitionInfo.setDataProperty(partition.getId(),
- DataProperty.DEFAULT_HDD_DATA_PROPERTY);
+ DataProperty.DEFAULT_HDD_DATA_PROPERTY);
storageMediumMap.put(partitionId, TStorageMedium.HDD);
LOG.debug("partition[{}-{}-{}] storage medium changed from SSD to HDD",
- dbId, tableId, partitionId);
+ dbId, tableId, partitionId);
// log
ModifyPartitionInfo info =
new ModifyPartitionInfo(db.getId(), olapTable.getId(),
- partition.getId(),
- DataProperty.DEFAULT_HDD_DATA_PROPERTY,
- (short) -1);
+ partition.getId(),
+ DataProperty.DEFAULT_HDD_DATA_PROPERTY,
+ (short) -1);
editLog.logModifyPartition(info);
}
} // end for partitions
@@ -4247,6 +4250,10 @@ public class Catalog {
return this.load;
}
+ public RoutineLoad getRoutineLoadInstance() {
+ return routineLoad;
+ }
+
public ExportMgr getExportMgr() {
return this.exportMgr;
}
@@ -4320,7 +4327,7 @@ public class Catalog {
}
return this.masterIp;
}
-
+
public EsStateStore getEsStateStore() {
return this.esStateStore;
}
@@ -5029,7 +5036,6 @@ public class Catalog {
}
/**
- *
* @param ctx
* @param clusterName
* @throws DdlException
@@ -5037,7 +5043,7 @@ public class Catalog {
public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY,
- ConnectContext.get().getQualifiedUser(), "enter");
+ ConnectContext.get().getQualifiedUser(), "enter");
}
if (!nameToCluster.containsKey(clusterName)) {
@@ -5250,6 +5256,7 @@ public class Catalog {
/**
* get migrate progress , when finish migration, next clonecheck will reset dbState
+ *
* @return
*/
public Set getMigrations() {
@@ -5322,14 +5329,14 @@ public class Catalog {
List latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
- LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
- + cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
- + cluster.getBackendIdList().size());
+ LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
+ + cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
+ + cluster.getBackendIdList().size());
}
// The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
// SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
// for adding BE to some Cluster, but loadCluster is after loadBackend.
- cluster.setBackendIdList(latestBackendIds);
+ cluster.setBackendIdList(latestBackendIds);
final InfoSchemaDb db = new InfoSchemaDb(cluster.getName());
db.setClusterName(cluster.getName());
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
new file mode 100644
index 0000000000..cced4ddc1e
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.base.Strings;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.SystemIdGenerator;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TTaskType;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class KafkaRoutineLoadJob extends RoutineLoadJob {
+ private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);
+
+ private static final String FE_GROUP_ID = "fe_fetch_partitions";
+ private static final int FETCH_PARTITIONS_TIMEOUT = 10;
+
+ private String serverAddress;
+ private String topic;
+ // optional
+ private List kafkaPartitions;
+
+ public KafkaRoutineLoadJob() {
+ }
+
+ public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, long tableId,
+ String partitions, String columns, String where, String columnSeparator,
+ int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType,
+ int maxErrorNum, TResourceInfo resourceInfo, String serverAddress, String topic) {
+ super(id, name, userName, dbId, tableId, partitions, columns, where,
+ columnSeparator, desireTaskConcurrentNum, state, dataSourceType, maxErrorNum, resourceInfo);
+ this.serverAddress = serverAddress;
+ this.topic = topic;
+ }
+
+ @Override
+ public List divideRoutineLoadJob(int currentConcurrentTaskNum) {
+ // divide kafkaPartitions into tasks
+ List kafkaRoutineLoadTaskList = new ArrayList<>();
+ for (int i = 0; i < currentConcurrentTaskNum; i++) {
+ // TODO(ml): init load task
+ kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH,
+ dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId()));
+ }
+ for (int i = 0; i < kafkaPartitions.size(); i++) {
+ kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i));
+ }
+ List result = new ArrayList<>();
+ result.addAll(kafkaRoutineLoadTaskList);
+ return result;
+ }
+
+ @Override
+ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
+ updatePartitions();
+ SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
+ Database db = Catalog.getCurrentCatalog().getDb(dbId);
+ if (db == null) {
+ LOG.warn("db {} is not exists from job {}", dbId, id);
+ throw new MetaNotFoundException("db " + dbId + " is not exists from job " + id);
+ }
+ String clusterName = db.getClusterName();
+ if (Strings.isNullOrEmpty(clusterName)) {
+ LOG.debug("database {} has no cluster name", dbId);
+ clusterName = SystemInfoService.DEFAULT_CLUSTER;
+ }
+ int aliveBeNum = systemInfoService.getClusterBackendIds(clusterName, true).size();
+ int partitionNum = kafkaPartitions.size();
+
+ LOG.info("current concurrent task number is min "
+ + "(current size of partition {}, desire task concurrent num {}, alive be num {})",
+ partitionNum, desireTaskConcurrentNum, aliveBeNum);
+ return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum));
+ }
+
+ private void updatePartitions() {
+ // fetch all of kafkaPartitions in topic
+ if (kafkaPartitions == null || kafkaPartitions.size() == 0) {
+ kafkaPartitions = new ArrayList<>();
+ Properties props = new Properties();
+ props.put("bootstrap.servers", this.serverAddress);
+ props.put("group.id", FE_GROUP_ID);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ List partitionList = consumer.partitionsFor(
+ topic, Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT));
+ for (PartitionInfo partitionInfo : partitionList) {
+ kafkaPartitions.add(partitionInfo.partition());
+ }
+ }
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java
new file mode 100644
index 0000000000..ff46af630b
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+public class KafkaRoutineLoadProgress {
+
+ private String partitionName;
+ private long offset;
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java
new file mode 100644
index 0000000000..89347745c4
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TTaskType;
+
+import java.util.List;
+
+public class KafkaRoutineLoadTask extends RoutineLoadTask {
+
+ private List kafkaPartitions;
+
+ public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
+ long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
+ super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
+ this.kafkaPartitions = Lists.newArrayList();
+ }
+
+ public void addKafkaPartition(int partition) {
+ kafkaPartitions.add(partition);
+ }
+
+ public List getKafkaPartitions() {
+ return kafkaPartitions;
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java
new file mode 100644
index 0000000000..0dd6b3cc43
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.collect.Maps;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class RoutineLoad {
+ private static final Logger LOG = LogManager.getLogger(RoutineLoad.class);
+ private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100;
+
+ // TODO(ml): real-time calculate by be
+ private Map beIdToMaxConcurrentTasks;
+
+ // stream load job meta
+ private Map idToRoutineLoadJob;
+ private Map idToNeedSchedulerRoutineLoadJob;
+ private Map idToRunningRoutineLoadJob;
+ private Map idToCancelledRoutineLoadJob;
+
+ // stream load tasks meta (not persistent)
+ private Map idToRoutineLoadTask;
+ private Map idToNeedSchedulerRoutineLoadTask;
+
+ private ReentrantReadWriteLock lock;
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ public RoutineLoad() {
+ idToRoutineLoadJob = Maps.newHashMap();
+ idToNeedSchedulerRoutineLoadJob = Maps.newHashMap();
+ idToRunningRoutineLoadJob = Maps.newHashMap();
+ idToCancelledRoutineLoadJob = Maps.newHashMap();
+ idToRoutineLoadTask = Maps.newHashMap();
+ idToNeedSchedulerRoutineLoadTask = Maps.newHashMap();
+ lock = new ReentrantReadWriteLock(true);
+ }
+
+ public int getTotalMaxConcurrentTaskNum() {
+ readLock();
+ try {
+ if (beIdToMaxConcurrentTasks == null) {
+ beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true)
+ .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
+ }
+ return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum();
+ } finally {
+ readUnlock();
+ }
+ }
+
+ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) {
+ writeLock();
+ try {
+ idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void addRoutineLoadTasks(List routineLoadTaskList) {
+ writeLock();
+ try {
+ idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect(
+ Collectors.toMap(task -> task.getSignature(), task -> task)));
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public Map getIdToRoutineLoadTask() {
+ return idToRoutineLoadTask;
+ }
+
+ public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList) {
+ writeLock();
+ try {
+ idToNeedSchedulerRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect(
+ Collectors.toMap(task -> task.getSignature(), task -> task)));
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void removeRoutineLoadTasks(List routineLoadTasks) {
+ if (routineLoadTasks != null) {
+ writeLock();
+ try {
+ routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature()));
+ routineLoadTasks.parallelStream().forEach(task ->
+ idToNeedSchedulerRoutineLoadTask.remove(task.getSignature()));
+ } finally {
+ writeUnlock();
+ }
+ }
+ }
+
+ public Map getIdToNeedSchedulerRoutineLoadTasks() {
+ readLock();
+ try {
+ return idToNeedSchedulerRoutineLoadTask;
+ } finally {
+ readUnlock();
+ }
+ }
+
+ public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException {
+ List jobs = new ArrayList<>();
+ Collection stateJobs = null;
+ readLock();
+ LOG.debug("begin to get routine load job by state {}", jobState.name());
+ try {
+ switch (jobState) {
+ case NEED_SCHEDULER:
+ stateJobs = idToNeedSchedulerRoutineLoadJob.values();
+ break;
+ case PAUSED:
+ throw new LoadException("not support getting paused routine load jobs");
+ case RUNNING:
+ stateJobs = idToRunningRoutineLoadJob.values();
+ break;
+ case STOPPED:
+ throw new LoadException("not support getting stopped routine load jobs");
+ default:
+ break;
+ }
+ if (stateJobs != null) {
+ jobs.addAll(stateJobs);
+ LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name());
+ }
+ } finally {
+ readUnlock();
+ }
+ return jobs;
+ }
+
+ public void updateRoutineLoadJobStateNoValid(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) {
+ writeLock();
+ try {
+ RoutineLoadJob.JobState srcJobState = routineLoadJob.getState();
+ long jobId = routineLoadJob.getId();
+ LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState);
+ switch (jobState) {
+ case NEED_SCHEDULER:
+ idToRunningRoutineLoadJob.remove(jobId);
+ idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob);
+ break;
+ case PAUSED:
+ idToNeedSchedulerRoutineLoadJob.remove(jobId);
+ idToRunningRoutineLoadJob.remove(jobId);
+ break;
+ case RUNNING:
+ idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob);
+ idToRunningRoutineLoadJob.put(jobId, routineLoadJob);
+ break;
+ case CANCELLED:
+ idToRunningRoutineLoadJob.remove(jobId);
+ idToNeedSchedulerRoutineLoadJob.remove(jobId);
+ idToCancelledRoutineLoadJob.put(jobId, routineLoadJob);
+ break;
+ case STOPPED:
+ idToRunningRoutineLoadJob.remove(jobId);
+ idToNeedSchedulerRoutineLoadJob.remove(jobId);
+ break;
+ default:
+ break;
+ }
+ routineLoadJob.setState(jobState);
+ Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState)
+ throws LoadException {
+ writeLock();
+ try {
+ RoutineLoadJob.JobState srcJobState = routineLoadJob.getState();
+ long jobId = routineLoadJob.getId();
+ LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState);
+ checkStateTransform(srcJobState, jobState);
+ switch (jobState) {
+ case NEED_SCHEDULER:
+ idToRunningRoutineLoadJob.remove(jobId);
+ idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob);
+ break;
+ case PAUSED:
+ idToNeedSchedulerRoutineLoadJob.remove(jobId);
+ idToRunningRoutineLoadJob.remove(jobId);
+ break;
+ case RUNNING:
+ idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob);
+ idToRunningRoutineLoadJob.put(jobId, routineLoadJob);
+ break;
+ case CANCELLED:
+ idToRunningRoutineLoadJob.remove(jobId);
+ idToNeedSchedulerRoutineLoadJob.remove(jobId);
+ idToCancelledRoutineLoadJob.put(jobId, routineLoadJob);
+ break;
+ case STOPPED:
+ idToRunningRoutineLoadJob.remove(jobId);
+ idToNeedSchedulerRoutineLoadJob.remove(jobId);
+ break;
+ default:
+ break;
+ }
+ routineLoadJob.setState(jobState);
+ Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState)
+ throws LoadException {
+ if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) {
+ throw new LoadException("could not transform " + currentState + " to " + desireState);
+ } else if (currentState == RoutineLoadJob.JobState.CANCELLED ||
+ currentState == RoutineLoadJob.JobState.STOPPED) {
+ throw new LoadException("could not transform " + currentState + " to " + desireState);
+ }
+ }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
new file mode 100644
index 0000000000..af737356fe
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.thrift.TResourceInfo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Routine load job is a function which stream load data from streaming medium to doris.
+ * This function is suitable for streaming load job which loading data continuously
+ * The properties include stream load properties and job properties.
+ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load
+ * The routine load job support different streaming medium such as KAFKA
+ */
+public class RoutineLoadJob implements Writable {
+
+ public enum JobState {
+ NEED_SCHEDULER,
+ RUNNING,
+ PAUSED,
+ STOPPED,
+ CANCELLED
+ }
+
+ public enum DataSourceType {
+ KAFKA
+ }
+
+ protected long id;
+ protected String name;
+ protected String userName;
+ protected long dbId;
+ protected long tableId;
+ protected String partitions;
+ protected String columns;
+ protected String where;
+ protected String columnSeparator;
+ protected int desireTaskConcurrentNum;
+ protected JobState state;
+ protected DataSourceType dataSourceType;
+ // max number of error data in ten thousand data
+ protected int maxErrorNum;
+ protected String progress;
+ protected ReentrantReadWriteLock lock;
+ // TODO(ml): error sample
+
+
+ public RoutineLoadJob() {
+ }
+
+ public RoutineLoadJob(long id, String name, String userName, long dbId, long tableId,
+ String partitions, String columns, String where, String columnSeparator,
+ int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType,
+ int maxErrorNum, TResourceInfo resourceInfo) {
+ this.id = id;
+ this.name = name;
+ this.userName = userName;
+ this.dbId = dbId;
+ this.tableId = tableId;
+ this.partitions = partitions;
+ this.columns = columns;
+ this.where = where;
+ this.columnSeparator = columnSeparator;
+ this.desireTaskConcurrentNum = desireTaskConcurrentNum;
+ this.state = state;
+ this.dataSourceType = dataSourceType;
+ this.maxErrorNum = maxErrorNum;
+ this.resourceInfo = resourceInfo;
+ this.progress = "";
+ lock = new ReentrantReadWriteLock(true);
+ }
+
+ public void readLock() {
+ lock.readLock().lock();
+ }
+
+ public void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ public void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ public void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ // thrift object
+ private TResourceInfo resourceInfo;
+
+ public long getId() {
+ return id;
+ }
+
+ public JobState getState() {
+ return state;
+ }
+
+ public void setState(JobState state) {
+ this.state = state;
+ }
+
+ public TResourceInfo getResourceInfo() {
+ return resourceInfo;
+ }
+
+ public List divideRoutineLoadJob(int currentConcurrentTaskNum) {
+ return null;
+ }
+
+ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
+ return 0;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // TODO(ml)
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // TODO(ml)
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
new file mode 100644
index 0000000000..6c3cdefdf0
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.Daemon;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class RoutineLoadScheduler extends Daemon {
+
+ private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class);
+
+ private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance();
+
+ @Override
+ protected void runOneCycle() {
+ // get need scheduler routine jobs
+ List routineLoadJobList = null;
+ try {
+ routineLoadJobList = getNeedSchedulerRoutineJobs();
+ } catch (LoadException e) {
+ LOG.error("failed to get need scheduler routine jobs");
+ }
+
+ LOG.debug("there are {} job need scheduler", routineLoadJobList.size());
+ for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
+ // judge nums of tasks more then max concurrent tasks of cluster
+ List routineLoadTaskList = null;
+ try {
+ routineLoadJob.writeLock();
+
+ if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) {
+ int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();
+ int totalTaskNum = currentConcurrentTaskNum + routineLoad.getIdToRoutineLoadTask().size();
+ if (totalTaskNum > routineLoad.getTotalMaxConcurrentTaskNum()) {
+ LOG.info("job {} concurrent task num {}, current total task num {}. "
+ + "desired total task num {} more then total max task num {}, "
+ + "skip this turn of scheduler",
+ routineLoadJob.getId(), currentConcurrentTaskNum,
+ routineLoad.getIdToRoutineLoadTask().size(),
+ totalTaskNum, routineLoad.getTotalMaxConcurrentTaskNum());
+ break;
+ }
+ // divide job into tasks
+ routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum);
+
+ // update tasks meta
+ routineLoad.addRoutineLoadTasks(routineLoadTaskList);
+ routineLoad.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList);
+
+ // change job state to running
+ routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING);
+ }
+ } catch (MetaNotFoundException e) {
+ routineLoad.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED);
+ } catch (LoadException e) {
+ LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(),
+ e.getMessage(), e);
+ routineLoad.removeRoutineLoadTasks(routineLoadTaskList);
+ } finally {
+ routineLoadJob.writeUnlock();
+ }
+ }
+
+ }
+
+ private List getNeedSchedulerRoutineJobs() throws LoadException {
+ return routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER);
+ }
+
+
+}
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java
new file mode 100644
index 0000000000..184feee880
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.task.AgentTask;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TTaskType;
+
+public class RoutineLoadTask extends AgentTask{
+
+ public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
+ long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
+ super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 5e7879441f..e7e331094e 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -49,6 +49,7 @@ import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
+import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
@@ -202,7 +203,7 @@ public class EditLog {
DropPartitionInfo info = (DropPartitionInfo) journal.getData();
LOG.info("Begin to unprotect drop partition. db = " + info.getDbId()
+ " table = " + info.getTableId()
- + " partitionName = " + info.getPartitionName());
+ + " partitionName = " + info.getPartitionName());
catalog.replayDropPartition(info);
break;
}
@@ -505,8 +506,8 @@ public class EditLog {
int version = Integer.parseInt(versionString);
if (catalog.getJournalVersion() > FeConstants.meta_version) {
LOG.error("meta data version is out of date, image: {}. meta: {}."
- + "please update FeConstants.meta_version and restart.",
- catalog.getJournalVersion(), FeConstants.meta_version);
+ + "please update FeConstants.meta_version and restart.",
+ catalog.getJournalVersion(), FeConstants.meta_version);
System.exit(-1);
}
catalog.setJournalVersion(version);
@@ -663,7 +664,7 @@ public class EditLog {
if (LOG.isDebugEnabled()) {
LOG.debug("nextId = {}, numTransactions = {}, totalTimeTransactions = {}, op = {}",
- txId, numTransactions, totalTimeTransactions, op);
+ txId, numTransactions, totalTimeTransactions, op);
}
if (txId == Config.edit_log_roll_num) {
@@ -691,7 +692,7 @@ public class EditLog {
public void logSaveNextId(long nextId) {
logEdit(OperationType.OP_SAVE_NEXTID, new Text(Long.toString(nextId)));
}
-
+
public void logSaveTransactionId(long transactionId) {
logEdit(OperationType.OP_SAVE_TRANSACTION_ID, new Text(Long.toString(transactionId)));
}
@@ -776,6 +777,10 @@ public class EditLog {
logEdit(OperationType.OP_LOAD_DONE, job);
}
+ public void logRoutineLoadJob(RoutineLoadJob job) {
+ logEdit(OperationType.OP_ROUTINE_LOAD_JOB, job);
+ }
+
public void logStartRollup(RollupJob rollupJob) {
logEdit(OperationType.OP_START_ROLLUP, rollupJob);
}
@@ -783,7 +788,7 @@ public class EditLog {
public void logFinishingRollup(RollupJob rollupJob) {
logEdit(OperationType.OP_FINISHING_ROLLUP, rollupJob);
}
-
+
public void logFinishRollup(RollupJob rollupJob) {
logEdit(OperationType.OP_FINISH_ROLLUP, rollupJob);
}
@@ -1027,7 +1032,7 @@ public class EditLog {
public void logDeleteTransactionState(TransactionState transactionState) {
logEdit(OperationType.OP_DELETE_TRANSACTION_STATE, transactionState);
}
-
+
public void logBackupJob(BackupJob job) {
logEdit(OperationType.OP_BACKUP_JOB, job);
}
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index 9d2344f675..491ab4c7e4 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -138,4 +138,7 @@ public class OperationType {
public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
public static final short OP_SAVE_TRANSACTION_ID = 104;
+ // routine load 110~120
+ public static final short OP_ROUTINE_LOAD_JOB = 110;
+
}
diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
new file mode 100644
index 0000000000..ab88bff6be
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import com.google.common.collect.Lists;
+import mockit.Deencapsulation;
+import mockit.Delegate;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TResourceInfo;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KafkaRoutineLoadJobTest {
+
+ @Test
+ public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer,
+ @Injectable PartitionInfo partitionInfo1,
+ @Injectable PartitionInfo partitionInfo2,
+ @Mocked Catalog catalog,
+ @Mocked SystemInfoService systemInfoService,
+ @Mocked Database database) throws MetaNotFoundException {
+ List partitionInfoList = new ArrayList<>();
+ partitionInfoList.add(partitionInfo1);
+ partitionInfoList.add(partitionInfo2);
+ List beIds = Lists.newArrayList(1L);
+
+ String clusterName = "clusterA";
+
+ new Expectations() {
+ {
+ kafkaConsumer.partitionsFor(anyString, (Duration) any);
+ result = partitionInfoList;
+ Catalog.getCurrentSystemInfo();
+ result = systemInfoService;
+ Catalog.getCurrentCatalog();
+ result = catalog;
+ catalog.getDb(anyLong);
+ result = database;
+ database.getClusterName();
+ result = clusterName;
+ systemInfoService.getClusterBackendIds(clusterName, true);
+ result = beIds;
+ }
+ };
+
+ KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L,
+ 1L, "1L", "v1", "", "", 3,
+ RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(),
+ "", "");
+ Assert.assertEquals(1, kafkaRoutineLoadJob.calculateCurrentConcurrentTaskNum());
+ }
+
+
+ @Test
+ public void testDivideRoutineLoadJob() {
+
+ KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L,
+ 1L, "1L", "v1", "", "", 3,
+ RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(),
+ "", "");
+
+ Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6));
+
+ List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2);
+ Assert.assertEquals(2, result.size());
+ for (RoutineLoadTask routineLoadTask : result) {
+ KafkaRoutineLoadTask kafkaRoutineLoadTask = (KafkaRoutineLoadTask) routineLoadTask;
+ if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 2) {
+ Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(1));
+ Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(6));
+ } else if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 1) {
+ Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(4));
+ } else {
+ Assert.fail();
+ }
+ }
+ }
+}
diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
new file mode 100644
index 0000000000..2a9e4de353
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.system.SystemInfoService;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Catalog.class})
+public class RoutineLoadSchedulerTest {
+
+ @Test
+ public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException {
+ int taskNum = 1;
+ List routineLoadTaskList = new ArrayList<>();
+ KafkaRoutineLoadTask kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaRoutineLoadTask.class);
+ EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes();
+ EasyMock.replay(kafkaRoutineLoadTask);
+ routineLoadTaskList.add(kafkaRoutineLoadTask);
+
+ KafkaRoutineLoadJob routineLoadJob = EasyMock.createNiceMock(KafkaRoutineLoadJob.class);
+ EasyMock.expect(routineLoadJob.calculateCurrentConcurrentTaskNum()).andReturn(taskNum).anyTimes();
+ EasyMock.expect(routineLoadJob.divideRoutineLoadJob(taskNum)).andReturn(routineLoadTaskList).anyTimes();
+ EasyMock.expect(routineLoadJob.getState()).andReturn(RoutineLoadJob.JobState.NEED_SCHEDULER).anyTimes();
+ EasyMock.replay(routineLoadJob);
+
+ SystemInfoService systemInfoService = EasyMock.createNiceMock(SystemInfoService.class);
+ List beIds = Arrays.asList(1L, 2L, 3L);
+ EasyMock.expect(systemInfoService.getBackendIds(true)).andReturn(beIds).anyTimes();
+ EasyMock.replay(systemInfoService);
+
+ Catalog catalog = EasyMock.createNiceMock(Catalog.class);
+ EditLog editLog = EasyMock.createNiceMock(EditLog.class);
+ PowerMock.mockStatic(Catalog.class);
+ EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes();
+ EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes();
+ PowerMock.replay(Catalog.class);
+
+
+ RoutineLoad routineLoad = new RoutineLoad();
+ EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes();
+ EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoad).anyTimes();
+ EasyMock.replay(catalog);
+
+ routineLoad.addRoutineLoadJob(routineLoadJob);
+ routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER);
+
+ RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler();
+ routineLoadScheduler.runOneCycle();
+
+ Assert.assertEquals(1, routineLoad.getIdToRoutineLoadTask().size());
+ Assert.assertEquals(1, routineLoad.getIdToNeedSchedulerRoutineLoadTasks().size());
+ Assert.assertEquals(1, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size());
+ Assert.assertEquals(0, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size());
+
+ }
+}