Add scheduler routine load job for stream load (#313)

1. fetch need_scheduler routine load job
2. caculate current concurrent task number of job
3. divide kafka partition into tasks
This commit is contained in:
EmmyMiao87
2018-11-15 21:04:22 +08:00
committed by Mingyu Chen
parent 8ac9492b11
commit 44029937e4
14 changed files with 1090 additions and 157 deletions

View File

@ -517,6 +517,12 @@ under the License.
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>

View File

@ -969,7 +969,7 @@ help_stmt ::=
// Export statement
export_stmt ::=
// KW_EXPORT KW_TABLE table_name:tblName opt_using_partition:partitions
// KW_EXPORT KW_TABLE table_name:tblName opt_using_partition:partitions
KW_EXPORT KW_TABLE base_table_ref:tblRef
KW_TO STRING_LITERAL:path
opt_properties:properties

View File

@ -135,6 +135,7 @@ import org.apache.doris.load.LoadChecker;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoad;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.metric.MetricRepo;
@ -240,6 +241,7 @@ public class Catalog {
private ConcurrentHashMap<String, Cluster> nameToCluster;
private Load load;
private RoutineLoad routineLoad;
private ExportMgr exportMgr;
private Clone clone;
private Alter alter;
@ -311,7 +313,7 @@ public class Catalog {
private PullLoadJobMgr pullLoadJobMgr;
private BrokerMgr brokerMgr;
private GlobalTransactionMgr globalTransactionMgr;
private DeployManager deployManager;
@ -370,6 +372,7 @@ public class Catalog {
this.idToDb = new ConcurrentHashMap<>();
this.fullNameToDb = new ConcurrentHashMap<>();
this.load = new Load();
this.routineLoad = new RoutineLoad();
this.exportMgr = new ExportMgr();
this.clone = new Clone();
this.alter = new Alter();
@ -419,7 +422,7 @@ public class Catalog {
this.auth = new PaloAuth();
this.domainResolver = new DomainResolver(auth);
this.esStateStore = new EsStateStore();
}
@ -457,11 +460,11 @@ public class Catalog {
public BrokerMgr getBrokerMgr() {
return brokerMgr;
}
public static GlobalTransactionMgr getCurrentGlobalTransactionMgr() {
return getCurrentCatalog().globalTransactionMgr;
}
public GlobalTransactionMgr getGlobalTransactionMgr() {
return globalTransactionMgr;
}
@ -572,20 +575,20 @@ public class Catalog {
// 5. create es state store
esStateStore.loadTableFromCatalog();
esStateStore.start();
// 6. start state listener thread
createStateListener();
listener.setName("stateListener");
listener.setInterval(STATE_CHANGE_CHECK_INTERVAL_MS);
listener.start();
// 7. start txn cleaner thread
createTxnCleaner();
txnCleaner.setName("txnCleaner");
// the clear threads runs every min(transaction_clean_interval_second,stream_load_default_timeout_second)/10
txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
txnCleaner.setInterval(Math.min(Config.transaction_clean_interval_second,
Config.stream_load_default_timeout_second) * 100L);
}
private void getClusterIdAndRole() throws IOException {
@ -615,7 +618,7 @@ public class Catalog {
if (!roleFile.exists()) {
// The very first time to start the first node of the cluster.
// It should became a Master node (Master node's role is also FOLLOWER, which means electable)
// For compatibility. Because this is the very first time to start, so we arbitrarily choose
// a new name for this node
role = FrontendNodeType.FOLLOWER;
@ -673,7 +676,7 @@ public class Catalog {
} else {
// try to get role and node name from helper node,
// this loop will not end until we get certain role type and name
while(true) {
while (true) {
if (!getFeNodeTypeAndNameFromHelpers()) {
LOG.warn("current node is not added to the group. please add it first. "
+ "sleep 5 seconds and retry, current helper nodes: {}", helperNodes);
@ -700,7 +703,7 @@ public class Catalog {
Pair<String, Integer> rightHelperNode = helperNodes.get(0);
Storage storage = new Storage(IMAGE_DIR);
if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
|| !roleFile.exists()) {
storage.writeFrontendRoleAndNodeName(role, nodeName);
}
@ -773,7 +776,7 @@ public class Catalog {
Preconditions.checkState(helperNodes.size() == 1);
LOG.info("finished to get cluster id: {}, role: {} and node name: {}",
clusterId, role.name(), nodeName);
clusterId, role.name(), nodeName);
}
public static String genFeNodeName(String host, int port, boolean isOldStyle) {
@ -799,7 +802,7 @@ public class Catalog {
conn = (HttpURLConnection) url.openConnection();
if (conn.getResponseCode() != 200) {
LOG.warn("failed to get fe node type from helper node: {}. response code: {}",
helperNode, conn.getResponseCode());
helperNode, conn.getResponseCode());
continue;
}
@ -1002,10 +1005,10 @@ public class Catalog {
// Clone checker
CloneChecker.getInstance().setInterval(Config.clone_checker_interval_second * 1000L);
CloneChecker.getInstance().start();
// Publish Version Daemon
publishVersionDaemon.start();
// Start txn cleaner
txnCleaner.start();
@ -1234,7 +1237,7 @@ public class Catalog {
long loadImageEndTime = System.currentTimeMillis();
LOG.info("finished load image in " + (loadImageEndTime - loadImageStartTime) + " ms");
}
private void recreateTabletInvertIndex() {
if (isCheckpointThread()) {
return;
@ -1390,7 +1393,7 @@ public class Catalog {
param.readFields(dis);
load.setLoadErrorHubInfo(param);
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_45) {
// 4. load delete jobs
int deleteJobSize = dis.readInt();
@ -1398,14 +1401,14 @@ public class Catalog {
for (int i = 0; i < deleteJobSize; i++) {
long dbId = dis.readLong();
newChecksum ^= dbId;
int deleteJobCount = dis.readInt();
newChecksum ^= deleteJobCount;
for (int j = 0; j < deleteJobCount; j++) {
LoadJob job = new LoadJob();
job.readFields(dis);
long currentTimeMs = System.currentTimeMillis();
// Delete the history load jobs that are older than
// LABEL_KEEP_MAX_MS
// This job must be FINISHED or CANCELLED
@ -1478,7 +1481,7 @@ public class Catalog {
// init job
Database db = getDb(job.getDbId());
// should check job state here because the job is finished but not removed from alter jobs list
if (db != null && (job.getState() == org.apache.doris.alter.AlterJob.JobState.PENDING
if (db != null && (job.getState() == org.apache.doris.alter.AlterJob.JobState.PENDING
|| job.getState() == org.apache.doris.alter.AlterJob.JobState.RUNNING)) {
job.replayInitJob(db);
}
@ -1532,7 +1535,7 @@ public class Catalog {
@Deprecated
private long loadBackupAndRestoreJob_D(DataInputStream dis, long checksum,
Class<? extends AbstractBackupJob_D> jobClass) throws IOException {
Class<? extends AbstractBackupJob_D> jobClass) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
for (int i = 0; i < size; i++) {
@ -1592,14 +1595,14 @@ public class Catalog {
long newChecksum = checksum ^ size;
UserPropertyMgr tmpUserPropertyMgr = new UserPropertyMgr();
tmpUserPropertyMgr.readFields(dis);
// transform it. the old UserPropertyMgr is deprecated
tmpUserPropertyMgr.transform(auth);
return newChecksum;
}
return checksum;
}
public long loadTransactionState(DataInputStream dis, long checksum) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
@ -1669,7 +1672,7 @@ public class Catalog {
long saveImageEndTime = System.currentTimeMillis();
LOG.info("finished save image {} in {} ms. checksum is {}",
curFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
curFile.getAbsolutePath(), (saveImageEndTime - saveImageStartTime), checksum);
}
public long saveHeader(DataOutputStream dos, long replayedJournalId, long checksum) throws IOException {
@ -1785,7 +1788,7 @@ public class Catalog {
// 3. load error hub info
LoadErrorHub.Param param = load.getLoadErrorHubInfo();
param.write(dos);
// 4. save delete load job info
Map<Long, List<LoadJob>> dbToDeleteJobs = load.getDbToDeleteJobs();
int deleteJobSize = dbToDeleteJobs.size();
@ -1873,7 +1876,7 @@ public class Catalog {
auth.write(dos);
return checksum;
}
public long saveTransactionState(DataOutputStream dos, long checksum) throws IOException {
int size = globalTransactionMgr.getTransactionNum();
checksum ^= size;
@ -1914,7 +1917,7 @@ public class Catalog {
}
};
}
public void createTxnCleaner() {
txnCleaner = new Daemon() {
protected void runOneCycle() {
@ -2008,85 +2011,85 @@ public class Catalog {
if (formerFeType == FrontendNodeType.INIT) {
switch (feType) {
case MASTER: {
try {
transferToMaster();
} catch (IOException e) {
e.printStackTrace();
case MASTER: {
try {
transferToMaster();
} catch (IOException e) {
e.printStackTrace();
}
break;
}
break;
}
case UNKNOWN:
case FOLLOWER:
case OBSERVER: {
transferToNonMaster();
break;
}
default:
break;
case UNKNOWN:
case FOLLOWER:
case OBSERVER: {
transferToNonMaster();
break;
}
default:
break;
}
return;
}
if (formerFeType == FrontendNodeType.UNKNOWN) {
switch (feType) {
case MASTER: {
try {
transferToMaster();
} catch (IOException e) {
e.printStackTrace();
case MASTER: {
try {
transferToMaster();
} catch (IOException e) {
e.printStackTrace();
}
break;
}
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster();
break;
}
default:
case FOLLOWER:
case OBSERVER: {
transferToNonMaster();
break;
}
default:
}
return;
}
if (formerFeType == FrontendNodeType.FOLLOWER) {
switch (feType) {
case MASTER: {
try {
transferToMaster();
} catch (IOException e) {
e.printStackTrace();
case MASTER: {
try {
transferToMaster();
} catch (IOException e) {
e.printStackTrace();
}
break;
}
break;
}
case UNKNOWN:
case OBSERVER: {
transferToNonMaster();
break;
}
default:
case UNKNOWN:
case OBSERVER: {
transferToNonMaster();
break;
}
default:
}
return;
}
if (formerFeType == FrontendNodeType.OBSERVER) {
switch (feType) {
case UNKNOWN: {
transferToNonMaster();
case UNKNOWN: {
transferToNonMaster();
break;
}
default:
}
default:
}
return;
}
if (formerFeType == FrontendNodeType.MASTER) {
switch (feType) {
case UNKNOWN:
case FOLLOWER:
case OBSERVER: {
System.exit(-1);
}
default:
case UNKNOWN:
case FOLLOWER:
case OBSERVER: {
System.exit(-1);
}
default:
}
return;
}
@ -2650,7 +2653,7 @@ public class Catalog {
}
public Pair<Long, Partition> addPartition(Database db, String tableName, OlapTable givenTable,
AddPartitionClause addPartitionClause, boolean isRestore) throws DdlException {
AddPartitionClause addPartitionClause, boolean isRestore) throws DdlException {
SingleRangePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
@ -2768,18 +2771,18 @@ public class Catalog {
try {
long partitionId = getNextId();
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
olapTable.getId(),
partitionId, partitionName,
indexIdToShortKeyColumnCount,
indexIdToSchemaHash,
indexIdToStorageType,
indexIdToSchema,
olapTable.getKeysType(),
distributionInfo,
dataProperty.getStorageMedium(),
singlePartitionDesc.getReplicationNum(),
versionInfo, bfColumns, olapTable.getBfFpp(),
tabletIdSet, isRestore);
olapTable.getId(),
partitionId, partitionName,
indexIdToShortKeyColumnCount,
indexIdToSchemaHash,
indexIdToStorageType,
indexIdToSchema,
olapTable.getKeysType(),
distributionInfo,
dataProperty.getStorageMedium(),
singlePartitionDesc.getReplicationNum(),
versionInfo, bfColumns, olapTable.getBfFpp(),
tabletIdSet, isRestore);
// check again
db.writeLock();
@ -3125,13 +3128,13 @@ public class Catalog {
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
partitionId, indexId, tabletId,
shortKeyColumnCount, schemaHash,
version, versionHash,
keysType,
storageType, storageMedium,
schema, bfColumns, bfFpp,
countDownLatch);
partitionId, indexId, tabletId,
shortKeyColumnCount, schemaHash,
version, versionHash,
keysType,
storageType, storageMedium,
schema, bfColumns, bfFpp,
countDownLatch);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
// not for resending task
@ -3298,7 +3301,7 @@ public class Catalog {
DataProperty dataProperty = null;
try {
dataProperty = PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
@ -3316,18 +3319,18 @@ public class Catalog {
// create partition
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
olapTable.getId(),
partitionId, partitionName,
olapTable.getIndexIdToShortKeyColumnCount(),
olapTable.getIndexIdToSchemaHash(),
olapTable.getIndexIdToStorageType(),
olapTable.getIndexIdToSchema(),
keysType,
distributionInfo,
dataProperty.getStorageMedium(),
replicationNum,
versionInfo, bfColumns, bfFpp,
tabletIdSet, isRestore);
olapTable.getId(),
partitionId, partitionName,
olapTable.getIndexIdToShortKeyColumnCount(),
olapTable.getIndexIdToSchemaHash(),
olapTable.getIndexIdToStorageType(),
olapTable.getIndexIdToSchema(),
keysType,
distributionInfo,
dataProperty.getStorageMedium(),
replicationNum,
versionInfo, bfColumns, bfFpp,
tabletIdSet, isRestore);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE) {
try {
@ -3349,16 +3352,16 @@ public class Catalog {
for (Map.Entry<String, Long> entry : partitionNameToId.entrySet()) {
DataProperty dataProperty = rangePartitionInfo.getDataProperty(entry.getValue());
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
entry.getValue(), entry.getKey(),
olapTable.getIndexIdToShortKeyColumnCount(),
olapTable.getIndexIdToSchemaHash(),
olapTable.getIndexIdToStorageType(),
olapTable.getIndexIdToSchema(),
keysType, distributionInfo,
dataProperty.getStorageMedium(),
partitionInfo.getReplicationNum(entry.getValue()),
versionInfo, bfColumns, bfFpp,
tabletIdSet, isRestore);
entry.getValue(), entry.getKey(),
olapTable.getIndexIdToShortKeyColumnCount(),
olapTable.getIndexIdToSchemaHash(),
olapTable.getIndexIdToStorageType(),
olapTable.getIndexIdToSchema(),
keysType, distributionInfo,
dataProperty.getStorageMedium(),
partitionInfo.getReplicationNum(entry.getValue()),
versionInfo, bfColumns, bfFpp,
tabletIdSet, isRestore);
olapTable.addPartition(partition);
}
} else {
@ -3409,14 +3412,14 @@ public class Catalog {
return returnTable;
}
private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
// create columns
List<Column> baseSchema = stmt.getColumns();
validateColumns(baseSchema);
// create partition info
PartitionDesc partitionDesc = stmt.getPartitionDesc();
PartitionInfo partitionInfo = null;
@ -3456,7 +3459,7 @@ public class Catalog {
HashDistributionDesc hashDistri = (HashDistributionDesc) stmt.getDistributionDesc();
kuduCreateOpts.addHashPartitions(hashDistri.getDistributionColumnNames(), hashDistri.getBuckets());
KuduPartition hashPartition = KuduPartition.createHashPartition(hashDistri.getDistributionColumnNames(),
hashDistri.getBuckets());
hashDistri.getBuckets());
// 3. partition, if exist
KuduPartition rangePartition = null;
@ -3549,8 +3552,8 @@ public class Catalog {
}
public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, short replicationNum,
boolean hidePassword) {
List<String> createRollupStmt, boolean separatePartition, short replicationNum,
boolean hidePassword) {
StringBuilder sb = new StringBuilder();
// 1. create table
@ -3565,7 +3568,7 @@ public class Catalog {
// 1.2 other table type
sb.append("CREATE ");
if (table.getType() == TableType.KUDU || table.getType() == TableType.MYSQL
if (table.getType() == TableType.KUDU || table.getType() == TableType.MYSQL
|| table.getType() == TableType.ELASTICSEARCH) {
sb.append("EXTERNAL ");
}
@ -3708,7 +3711,7 @@ public class Catalog {
sb.append(";");
} else if (table.getType() == TableType.ELASTICSEARCH) {
EsTable esTable = (EsTable) table;
// partition
PartitionInfo partitionInfo = esTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE) {
@ -3724,7 +3727,7 @@ public class Catalog {
}
sb.append(")\n()");
}
// properties
sb.append("\nPROPERTIES (\n");
sb.append("\"host\" = \"").append(esTable.getHosts()).append("\",\n");
@ -3823,8 +3826,8 @@ public class Catalog {
}
private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
DistributionInfo distributionInfo, long version, long versionHash, short replicationNum,
TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
Preconditions.checkArgument(replicationNum > 0);
DistributionInfoType distributionInfoType = distributionInfo.getType();
@ -3865,7 +3868,7 @@ public class Catalog {
if (fullNameToDb.get(dbName) == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
Table table = null;
db.writeLock();
try {
@ -3958,7 +3961,7 @@ public class Catalog {
}
public void handleJobsWhenDeleteReplica(long tableId, long partitionId, long indexId, long tabletId, long replicaId,
long backendId) {
long backendId) {
// rollup
getRollupHandler().removeReplicaRelatedTask(tableId, partitionId, indexId, tabletId, backendId);
@ -3977,10 +3980,10 @@ public class Catalog {
MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId());
Tablet tablet = materializedIndex.getTablet(info.getTabletId());
Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(),
info.getVersionHash(), info.getDataSize(), info.getRowCount(), ReplicaState.NORMAL,
info.getLastFailedVersion(),
info.getVersionHash(), info.getDataSize(), info.getRowCount(), ReplicaState.NORMAL,
info.getLastFailedVersion(),
info.getLastFailedVersionHash(),
info.getLastSuccessVersion(),
info.getLastSuccessVersion(),
info.getLastSuccessVersionHash());
tablet.addReplica(replica);
}
@ -4076,7 +4079,7 @@ public class Catalog {
String clusterName = ClusterNamespace.getClusterNameFromFullName(name);
return fullNameToDb.get(ClusterNamespace.getFullName(clusterName, dbName.toLowerCase()));
}
}
}
return null;
}
@ -4197,17 +4200,17 @@ public class Catalog {
&& dataProperty.getCooldownTimeMs() < currentTimeMs) {
// expire. change to HDD.
partitionInfo.setDataProperty(partition.getId(),
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
DataProperty.DEFAULT_HDD_DATA_PROPERTY);
storageMediumMap.put(partitionId, TStorageMedium.HDD);
LOG.debug("partition[{}-{}-{}] storage medium changed from SSD to HDD",
dbId, tableId, partitionId);
dbId, tableId, partitionId);
// log
ModifyPartitionInfo info =
new ModifyPartitionInfo(db.getId(), olapTable.getId(),
partition.getId(),
DataProperty.DEFAULT_HDD_DATA_PROPERTY,
(short) -1);
partition.getId(),
DataProperty.DEFAULT_HDD_DATA_PROPERTY,
(short) -1);
editLog.logModifyPartition(info);
}
} // end for partitions
@ -4247,6 +4250,10 @@ public class Catalog {
return this.load;
}
public RoutineLoad getRoutineLoadInstance() {
return routineLoad;
}
public ExportMgr getExportMgr() {
return this.exportMgr;
}
@ -4320,7 +4327,7 @@ public class Catalog {
}
return this.masterIp;
}
public EsStateStore getEsStateStore() {
return this.esStateStore;
}
@ -5029,7 +5036,6 @@ public class Catalog {
}
/**
*
* @param ctx
* @param clusterName
* @throws DdlException
@ -5037,7 +5043,7 @@ public class Catalog {
public void changeCluster(ConnectContext ctx, String clusterName) throws DdlException {
if (!Catalog.getCurrentCatalog().getAuth().checkCanEnterCluster(ConnectContext.get(), clusterName)) {
ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_AUTHORITY,
ConnectContext.get().getQualifiedUser(), "enter");
ConnectContext.get().getQualifiedUser(), "enter");
}
if (!nameToCluster.containsKey(clusterName)) {
@ -5250,6 +5256,7 @@ public class Catalog {
/**
* get migrate progress , when finish migration, next clonecheck will reset dbState
*
* @return
*/
public Set<BaseParam> getMigrations() {
@ -5322,14 +5329,14 @@ public class Catalog {
List<Long> latestBackendIds = systemInfo.getClusterBackendIds(cluster.getName());
if (latestBackendIds.size() != cluster.getBackendIdList().size()) {
LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
+ cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
+ cluster.getBackendIdList().size());
LOG.warn("Cluster:" + cluster.getName() + ", backends in Cluster is "
+ cluster.getBackendIdList().size() + ", backends in SystemInfoService is "
+ cluster.getBackendIdList().size());
}
// The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
// SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
// for adding BE to some Cluster, but loadCluster is after loadBackend.
cluster.setBackendIdList(latestBackendIds);
cluster.setBackendIdList(latestBackendIds);
final InfoSchemaDb db = new InfoSchemaDb(cluster.getName());
db.setClusterName(cluster.getName());

View File

@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import com.google.common.base.Strings;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.SystemIdGenerator;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class KafkaRoutineLoadJob extends RoutineLoadJob {
private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class);
private static final String FE_GROUP_ID = "fe_fetch_partitions";
private static final int FETCH_PARTITIONS_TIMEOUT = 10;
private String serverAddress;
private String topic;
// optional
private List<Integer> kafkaPartitions;
public KafkaRoutineLoadJob() {
}
public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, long tableId,
String partitions, String columns, String where, String columnSeparator,
int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType,
int maxErrorNum, TResourceInfo resourceInfo, String serverAddress, String topic) {
super(id, name, userName, dbId, tableId, partitions, columns, where,
columnSeparator, desireTaskConcurrentNum, state, dataSourceType, maxErrorNum, resourceInfo);
this.serverAddress = serverAddress;
this.topic = topic;
}
@Override
public List<RoutineLoadTask> divideRoutineLoadJob(int currentConcurrentTaskNum) {
// divide kafkaPartitions into tasks
List<KafkaRoutineLoadTask> kafkaRoutineLoadTaskList = new ArrayList<>();
for (int i = 0; i < currentConcurrentTaskNum; i++) {
// TODO(ml): init load task
kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH,
dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId()));
}
for (int i = 0; i < kafkaPartitions.size(); i++) {
kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i));
}
List<RoutineLoadTask> result = new ArrayList<>();
result.addAll(kafkaRoutineLoadTaskList);
return result;
}
@Override
public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
updatePartitions();
SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
LOG.warn("db {} is not exists from job {}", dbId, id);
throw new MetaNotFoundException("db " + dbId + " is not exists from job " + id);
}
String clusterName = db.getClusterName();
if (Strings.isNullOrEmpty(clusterName)) {
LOG.debug("database {} has no cluster name", dbId);
clusterName = SystemInfoService.DEFAULT_CLUSTER;
}
int aliveBeNum = systemInfoService.getClusterBackendIds(clusterName, true).size();
int partitionNum = kafkaPartitions.size();
LOG.info("current concurrent task number is min "
+ "(current size of partition {}, desire task concurrent num {}, alive be num {})",
partitionNum, desireTaskConcurrentNum, aliveBeNum);
return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum));
}
private void updatePartitions() {
// fetch all of kafkaPartitions in topic
if (kafkaPartitions == null || kafkaPartitions.size() == 0) {
kafkaPartitions = new ArrayList<>();
Properties props = new Properties();
props.put("bootstrap.servers", this.serverAddress);
props.put("group.id", FE_GROUP_ID);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<PartitionInfo> partitionList = consumer.partitionsFor(
topic, Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT));
for (PartitionInfo partitionInfo : partitionList) {
kafkaPartitions.add(partitionInfo.partition());
}
}
}
}

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
public class KafkaRoutineLoadProgress {
private String partitionName;
private long offset;
}

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import com.google.common.collect.Lists;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
import java.util.List;
public class KafkaRoutineLoadTask extends RoutineLoadTask {
private List<Integer> kafkaPartitions;
public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
this.kafkaPartitions = Lists.newArrayList();
}
public void addKafkaPartition(int partition) {
kafkaPartitions.add(partition);
}
public List<Integer> getKafkaPartitions() {
return kafkaPartitions;
}
}

View File

@ -0,0 +1,264 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import com.google.common.collect.Maps;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.LoadException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class RoutineLoad {
private static final Logger LOG = LogManager.getLogger(RoutineLoad.class);
private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100;
// TODO(ml): real-time calculate by be
private Map<Long, Integer> beIdToMaxConcurrentTasks;
// stream load job meta
private Map<Long, RoutineLoadJob> idToRoutineLoadJob;
private Map<Long, RoutineLoadJob> idToNeedSchedulerRoutineLoadJob;
private Map<Long, RoutineLoadJob> idToRunningRoutineLoadJob;
private Map<Long, RoutineLoadJob> idToCancelledRoutineLoadJob;
// stream load tasks meta (not persistent)
private Map<Long, RoutineLoadTask> idToRoutineLoadTask;
private Map<Long, RoutineLoadTask> idToNeedSchedulerRoutineLoadTask;
private ReentrantReadWriteLock lock;
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public RoutineLoad() {
idToRoutineLoadJob = Maps.newHashMap();
idToNeedSchedulerRoutineLoadJob = Maps.newHashMap();
idToRunningRoutineLoadJob = Maps.newHashMap();
idToCancelledRoutineLoadJob = Maps.newHashMap();
idToRoutineLoadTask = Maps.newHashMap();
idToNeedSchedulerRoutineLoadTask = Maps.newHashMap();
lock = new ReentrantReadWriteLock(true);
}
public int getTotalMaxConcurrentTaskNum() {
readLock();
try {
if (beIdToMaxConcurrentTasks == null) {
beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true)
.parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM));
}
return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum();
} finally {
readUnlock();
}
}
public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) {
writeLock();
try {
idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
} finally {
writeUnlock();
}
}
public void addRoutineLoadTasks(List<RoutineLoadTask> routineLoadTaskList) {
writeLock();
try {
idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect(
Collectors.toMap(task -> task.getSignature(), task -> task)));
} finally {
writeUnlock();
}
}
public Map<Long, RoutineLoadTask> getIdToRoutineLoadTask() {
return idToRoutineLoadTask;
}
public void addNeedSchedulerRoutineLoadTasks(List<RoutineLoadTask> routineLoadTaskList) {
writeLock();
try {
idToNeedSchedulerRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect(
Collectors.toMap(task -> task.getSignature(), task -> task)));
} finally {
writeUnlock();
}
}
public void removeRoutineLoadTasks(List<RoutineLoadTask> routineLoadTasks) {
if (routineLoadTasks != null) {
writeLock();
try {
routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature()));
routineLoadTasks.parallelStream().forEach(task ->
idToNeedSchedulerRoutineLoadTask.remove(task.getSignature()));
} finally {
writeUnlock();
}
}
}
public Map<Long, RoutineLoadTask> getIdToNeedSchedulerRoutineLoadTasks() {
readLock();
try {
return idToNeedSchedulerRoutineLoadTask;
} finally {
readUnlock();
}
}
public List<RoutineLoadJob> getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException {
List<RoutineLoadJob> jobs = new ArrayList<>();
Collection<RoutineLoadJob> stateJobs = null;
readLock();
LOG.debug("begin to get routine load job by state {}", jobState.name());
try {
switch (jobState) {
case NEED_SCHEDULER:
stateJobs = idToNeedSchedulerRoutineLoadJob.values();
break;
case PAUSED:
throw new LoadException("not support getting paused routine load jobs");
case RUNNING:
stateJobs = idToRunningRoutineLoadJob.values();
break;
case STOPPED:
throw new LoadException("not support getting stopped routine load jobs");
default:
break;
}
if (stateJobs != null) {
jobs.addAll(stateJobs);
LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name());
}
} finally {
readUnlock();
}
return jobs;
}
public void updateRoutineLoadJobStateNoValid(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) {
writeLock();
try {
RoutineLoadJob.JobState srcJobState = routineLoadJob.getState();
long jobId = routineLoadJob.getId();
LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState);
switch (jobState) {
case NEED_SCHEDULER:
idToRunningRoutineLoadJob.remove(jobId);
idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob);
break;
case PAUSED:
idToNeedSchedulerRoutineLoadJob.remove(jobId);
idToRunningRoutineLoadJob.remove(jobId);
break;
case RUNNING:
idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob);
idToRunningRoutineLoadJob.put(jobId, routineLoadJob);
break;
case CANCELLED:
idToRunningRoutineLoadJob.remove(jobId);
idToNeedSchedulerRoutineLoadJob.remove(jobId);
idToCancelledRoutineLoadJob.put(jobId, routineLoadJob);
break;
case STOPPED:
idToRunningRoutineLoadJob.remove(jobId);
idToNeedSchedulerRoutineLoadJob.remove(jobId);
break;
default:
break;
}
routineLoadJob.setState(jobState);
Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob);
} finally {
writeUnlock();
}
}
public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState)
throws LoadException {
writeLock();
try {
RoutineLoadJob.JobState srcJobState = routineLoadJob.getState();
long jobId = routineLoadJob.getId();
LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState);
checkStateTransform(srcJobState, jobState);
switch (jobState) {
case NEED_SCHEDULER:
idToRunningRoutineLoadJob.remove(jobId);
idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob);
break;
case PAUSED:
idToNeedSchedulerRoutineLoadJob.remove(jobId);
idToRunningRoutineLoadJob.remove(jobId);
break;
case RUNNING:
idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob);
idToRunningRoutineLoadJob.put(jobId, routineLoadJob);
break;
case CANCELLED:
idToRunningRoutineLoadJob.remove(jobId);
idToNeedSchedulerRoutineLoadJob.remove(jobId);
idToCancelledRoutineLoadJob.put(jobId, routineLoadJob);
break;
case STOPPED:
idToRunningRoutineLoadJob.remove(jobId);
idToNeedSchedulerRoutineLoadJob.remove(jobId);
break;
default:
break;
}
routineLoadJob.setState(jobState);
Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob);
} finally {
writeUnlock();
}
}
private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState)
throws LoadException {
if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) {
throw new LoadException("could not transform " + currentState + " to " + desireState);
} else if (currentState == RoutineLoadJob.JobState.CANCELLED ||
currentState == RoutineLoadJob.JobState.STOPPED) {
throw new LoadException("could not transform " + currentState + " to " + desireState);
}
}
}

View File

@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.thrift.TResourceInfo;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Routine load job is a function which stream load data from streaming medium to doris.
* This function is suitable for streaming load job which loading data continuously
* The properties include stream load properties and job properties.
* The desireTaskConcurrentNum means that user expect the number of concurrent stream load
* The routine load job support different streaming medium such as KAFKA
*/
public class RoutineLoadJob implements Writable {
public enum JobState {
NEED_SCHEDULER,
RUNNING,
PAUSED,
STOPPED,
CANCELLED
}
public enum DataSourceType {
KAFKA
}
protected long id;
protected String name;
protected String userName;
protected long dbId;
protected long tableId;
protected String partitions;
protected String columns;
protected String where;
protected String columnSeparator;
protected int desireTaskConcurrentNum;
protected JobState state;
protected DataSourceType dataSourceType;
// max number of error data in ten thousand data
protected int maxErrorNum;
protected String progress;
protected ReentrantReadWriteLock lock;
// TODO(ml): error sample
public RoutineLoadJob() {
}
public RoutineLoadJob(long id, String name, String userName, long dbId, long tableId,
String partitions, String columns, String where, String columnSeparator,
int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType,
int maxErrorNum, TResourceInfo resourceInfo) {
this.id = id;
this.name = name;
this.userName = userName;
this.dbId = dbId;
this.tableId = tableId;
this.partitions = partitions;
this.columns = columns;
this.where = where;
this.columnSeparator = columnSeparator;
this.desireTaskConcurrentNum = desireTaskConcurrentNum;
this.state = state;
this.dataSourceType = dataSourceType;
this.maxErrorNum = maxErrorNum;
this.resourceInfo = resourceInfo;
this.progress = "";
lock = new ReentrantReadWriteLock(true);
}
public void readLock() {
lock.readLock().lock();
}
public void readUnlock() {
lock.readLock().unlock();
}
public void writeLock() {
lock.writeLock().lock();
}
public void writeUnlock() {
lock.writeLock().unlock();
}
// thrift object
private TResourceInfo resourceInfo;
public long getId() {
return id;
}
public JobState getState() {
return state;
}
public void setState(JobState state) {
this.state = state;
}
public TResourceInfo getResourceInfo() {
return resourceInfo;
}
public List<RoutineLoadTask> divideRoutineLoadJob(int currentConcurrentTaskNum) {
return null;
}
public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
return 0;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO(ml)
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO(ml)
}
}

View File

@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.Daemon;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class RoutineLoadScheduler extends Daemon {
private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class);
private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance();
@Override
protected void runOneCycle() {
// get need scheduler routine jobs
List<RoutineLoadJob> routineLoadJobList = null;
try {
routineLoadJobList = getNeedSchedulerRoutineJobs();
} catch (LoadException e) {
LOG.error("failed to get need scheduler routine jobs");
}
LOG.debug("there are {} job need scheduler", routineLoadJobList.size());
for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
// judge nums of tasks more then max concurrent tasks of cluster
List<RoutineLoadTask> routineLoadTaskList = null;
try {
routineLoadJob.writeLock();
if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) {
int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();
int totalTaskNum = currentConcurrentTaskNum + routineLoad.getIdToRoutineLoadTask().size();
if (totalTaskNum > routineLoad.getTotalMaxConcurrentTaskNum()) {
LOG.info("job {} concurrent task num {}, current total task num {}. "
+ "desired total task num {} more then total max task num {}, "
+ "skip this turn of scheduler",
routineLoadJob.getId(), currentConcurrentTaskNum,
routineLoad.getIdToRoutineLoadTask().size(),
totalTaskNum, routineLoad.getTotalMaxConcurrentTaskNum());
break;
}
// divide job into tasks
routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum);
// update tasks meta
routineLoad.addRoutineLoadTasks(routineLoadTaskList);
routineLoad.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList);
// change job state to running
routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING);
}
} catch (MetaNotFoundException e) {
routineLoad.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED);
} catch (LoadException e) {
LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(),
e.getMessage(), e);
routineLoad.removeRoutineLoadTasks(routineLoadTaskList);
} finally {
routineLoadJob.writeUnlock();
}
}
}
private List<RoutineLoadJob> getNeedSchedulerRoutineJobs() throws LoadException {
return routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER);
}
}

View File

@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.task.AgentTask;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
public class RoutineLoadTask extends AgentTask{
public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
}
}

View File

@ -49,6 +49,7 @@ import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
@ -202,7 +203,7 @@ public class EditLog {
DropPartitionInfo info = (DropPartitionInfo) journal.getData();
LOG.info("Begin to unprotect drop partition. db = " + info.getDbId()
+ " table = " + info.getTableId()
+ " partitionName = " + info.getPartitionName());
+ " partitionName = " + info.getPartitionName());
catalog.replayDropPartition(info);
break;
}
@ -505,8 +506,8 @@ public class EditLog {
int version = Integer.parseInt(versionString);
if (catalog.getJournalVersion() > FeConstants.meta_version) {
LOG.error("meta data version is out of date, image: {}. meta: {}."
+ "please update FeConstants.meta_version and restart.",
catalog.getJournalVersion(), FeConstants.meta_version);
+ "please update FeConstants.meta_version and restart.",
catalog.getJournalVersion(), FeConstants.meta_version);
System.exit(-1);
}
catalog.setJournalVersion(version);
@ -663,7 +664,7 @@ public class EditLog {
if (LOG.isDebugEnabled()) {
LOG.debug("nextId = {}, numTransactions = {}, totalTimeTransactions = {}, op = {}",
txId, numTransactions, totalTimeTransactions, op);
txId, numTransactions, totalTimeTransactions, op);
}
if (txId == Config.edit_log_roll_num) {
@ -691,7 +692,7 @@ public class EditLog {
public void logSaveNextId(long nextId) {
logEdit(OperationType.OP_SAVE_NEXTID, new Text(Long.toString(nextId)));
}
public void logSaveTransactionId(long transactionId) {
logEdit(OperationType.OP_SAVE_TRANSACTION_ID, new Text(Long.toString(transactionId)));
}
@ -776,6 +777,10 @@ public class EditLog {
logEdit(OperationType.OP_LOAD_DONE, job);
}
public void logRoutineLoadJob(RoutineLoadJob job) {
logEdit(OperationType.OP_ROUTINE_LOAD_JOB, job);
}
public void logStartRollup(RollupJob rollupJob) {
logEdit(OperationType.OP_START_ROLLUP, rollupJob);
}
@ -783,7 +788,7 @@ public class EditLog {
public void logFinishingRollup(RollupJob rollupJob) {
logEdit(OperationType.OP_FINISHING_ROLLUP, rollupJob);
}
public void logFinishRollup(RollupJob rollupJob) {
logEdit(OperationType.OP_FINISH_ROLLUP, rollupJob);
}
@ -1027,7 +1032,7 @@ public class EditLog {
public void logDeleteTransactionState(TransactionState transactionState) {
logEdit(OperationType.OP_DELETE_TRANSACTION_STATE, transactionState);
}
public void logBackupJob(BackupJob job) {
logEdit(OperationType.OP_BACKUP_JOB, job);
}

View File

@ -138,4 +138,7 @@ public class OperationType {
public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
public static final short OP_SAVE_TRANSACTION_ID = 104;
// routine load 110~120
public static final short OP_ROUTINE_LOAD_JOB = 110;
}

View File

@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import com.google.common.collect.Lists;
import mockit.Deencapsulation;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class KafkaRoutineLoadJobTest {
@Test
public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer,
@Injectable PartitionInfo partitionInfo1,
@Injectable PartitionInfo partitionInfo2,
@Mocked Catalog catalog,
@Mocked SystemInfoService systemInfoService,
@Mocked Database database) throws MetaNotFoundException {
List<PartitionInfo> partitionInfoList = new ArrayList<>();
partitionInfoList.add(partitionInfo1);
partitionInfoList.add(partitionInfo2);
List<Long> beIds = Lists.newArrayList(1L);
String clusterName = "clusterA";
new Expectations() {
{
kafkaConsumer.partitionsFor(anyString, (Duration) any);
result = partitionInfoList;
Catalog.getCurrentSystemInfo();
result = systemInfoService;
Catalog.getCurrentCatalog();
result = catalog;
catalog.getDb(anyLong);
result = database;
database.getClusterName();
result = clusterName;
systemInfoService.getClusterBackendIds(clusterName, true);
result = beIds;
}
};
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L,
1L, "1L", "v1", "", "", 3,
RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(),
"", "");
Assert.assertEquals(1, kafkaRoutineLoadJob.calculateCurrentConcurrentTaskNum());
}
@Test
public void testDivideRoutineLoadJob() {
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L,
1L, "1L", "v1", "", "", 3,
RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(),
"", "");
Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6));
List<RoutineLoadTask> result = kafkaRoutineLoadJob.divideRoutineLoadJob(2);
Assert.assertEquals(2, result.size());
for (RoutineLoadTask routineLoadTask : result) {
KafkaRoutineLoadTask kafkaRoutineLoadTask = (KafkaRoutineLoadTask) routineLoadTask;
if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 2) {
Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(1));
Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(6));
} else if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 1) {
Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(4));
} else {
Assert.fail();
}
}
}
}

View File

@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.persist.EditLog;
import org.apache.doris.system.SystemInfoService;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Catalog.class})
public class RoutineLoadSchedulerTest {
@Test
public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException {
int taskNum = 1;
List<RoutineLoadTask> routineLoadTaskList = new ArrayList<>();
KafkaRoutineLoadTask kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaRoutineLoadTask.class);
EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes();
EasyMock.replay(kafkaRoutineLoadTask);
routineLoadTaskList.add(kafkaRoutineLoadTask);
KafkaRoutineLoadJob routineLoadJob = EasyMock.createNiceMock(KafkaRoutineLoadJob.class);
EasyMock.expect(routineLoadJob.calculateCurrentConcurrentTaskNum()).andReturn(taskNum).anyTimes();
EasyMock.expect(routineLoadJob.divideRoutineLoadJob(taskNum)).andReturn(routineLoadTaskList).anyTimes();
EasyMock.expect(routineLoadJob.getState()).andReturn(RoutineLoadJob.JobState.NEED_SCHEDULER).anyTimes();
EasyMock.replay(routineLoadJob);
SystemInfoService systemInfoService = EasyMock.createNiceMock(SystemInfoService.class);
List<Long> beIds = Arrays.asList(1L, 2L, 3L);
EasyMock.expect(systemInfoService.getBackendIds(true)).andReturn(beIds).anyTimes();
EasyMock.replay(systemInfoService);
Catalog catalog = EasyMock.createNiceMock(Catalog.class);
EditLog editLog = EasyMock.createNiceMock(EditLog.class);
PowerMock.mockStatic(Catalog.class);
EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes();
EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes();
PowerMock.replay(Catalog.class);
RoutineLoad routineLoad = new RoutineLoad();
EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes();
EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoad).anyTimes();
EasyMock.replay(catalog);
routineLoad.addRoutineLoadJob(routineLoadJob);
routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER);
RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler();
routineLoadScheduler.runOneCycle();
Assert.assertEquals(1, routineLoad.getIdToRoutineLoadTask().size());
Assert.assertEquals(1, routineLoad.getIdToNeedSchedulerRoutineLoadTasks().size());
Assert.assertEquals(1, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size());
Assert.assertEquals(0, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size());
}
}