Optimize the publish logic of streaming load (#350)
1. Only collect all error replicas if publish task is timeout. 2. Add 2 metrics to monitor the success of failure of txn. 3. Change publish timeout to Config.load_straggler_wait_second
This commit is contained in:
@ -330,6 +330,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
|
||||
}
|
||||
auto str = ctx->to_json();
|
||||
HttpChannel::send_reply(req, str);
|
||||
k_streaming_load_current_processing.increment(-1);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
|
||||
@ -91,10 +91,10 @@ TEST_F(FileHandlerTest, TestWrite) {
|
||||
ASSERT_EQ(22, length);
|
||||
|
||||
|
||||
char* large_bytes2[(1 << 12)];
|
||||
char* large_bytes2[(1 << 10)];
|
||||
memset(large_bytes2, 0, sizeof(char)*((1 << 12)));
|
||||
int i = 1;
|
||||
while (i < 1 << 20) {
|
||||
while (i < 1 << 17) {
|
||||
file_handler.write(large_bytes2, ((1 << 12)));
|
||||
++i;
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.catalog;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -178,6 +179,25 @@ public class Replica implements Writable {
|
||||
lastSuccessVersion, lastSuccessVersionHash, dataSize, rowCount);
|
||||
}
|
||||
|
||||
/* last failed version: LFV
|
||||
* last success version: LSV
|
||||
* version: V
|
||||
*
|
||||
* Case 1:
|
||||
* If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid.
|
||||
* Clone task will clone the version between LSV and LFV
|
||||
*
|
||||
* Case 2:
|
||||
* LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV.
|
||||
*
|
||||
* Case 3:
|
||||
* LFV remains unchanged, just update LSV, and then check if it falls into Case 1.
|
||||
*
|
||||
* Case 4:
|
||||
* V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may
|
||||
* happen when a clone task finished and report version V, but the LSV is already larger than V,
|
||||
* And we know that version between V and LSV is valid, so move V forward to LSV.
|
||||
*/
|
||||
private void updateReplicaInfo(long newVersion, long newVersionHash,
|
||||
long lastFailedVersion, long lastFailedVersionHash,
|
||||
long lastSuccessVersion, long lastSuccessVersionHash,
|
||||
@ -196,11 +216,14 @@ public class Replica implements Writable {
|
||||
lastSuccessVersion = this.version;
|
||||
lastSuccessVersionHash = this.versionHash;
|
||||
}
|
||||
|
||||
// case 1:
|
||||
if (this.lastSuccessVersion <= this.lastFailedVersion) {
|
||||
this.lastSuccessVersion = this.version;
|
||||
this.lastSuccessVersionHash = this.versionHash;
|
||||
}
|
||||
|
||||
// TODO: this case is unknown, add log to observe
|
||||
if (this.version > lastFailedVersion && lastFailedVersion > 0) {
|
||||
LOG.info("current version {} is larger than last failed version {} , "
|
||||
+ "last failed version hash {}, maybe a fatal error or be report version, print a stack here ",
|
||||
@ -209,15 +232,17 @@ public class Replica implements Writable {
|
||||
|
||||
if (lastFailedVersion != this.lastFailedVersion
|
||||
|| this.lastFailedVersionHash != lastFailedVersionHash) {
|
||||
// if last failed version changed, then set last success version to invalid version
|
||||
// Case 2:
|
||||
if (lastFailedVersion > this.lastFailedVersion) {
|
||||
this.lastFailedVersion = lastFailedVersion;
|
||||
this.lastFailedVersionHash = lastFailedVersionHash;
|
||||
this.lastFailedTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
this.lastSuccessVersion = this.version;
|
||||
this.lastSuccessVersionHash = this.versionHash;
|
||||
} else {
|
||||
// Case 3:
|
||||
if (lastSuccessVersion >= this.lastSuccessVersion) {
|
||||
this.lastSuccessVersion = lastSuccessVersion;
|
||||
this.lastSuccessVersionHash = lastSuccessVersionHash;
|
||||
@ -228,9 +253,7 @@ public class Replica implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
// if last failed version <= version, then last failed version is invalid
|
||||
// version xxxx | last failed version xxxx | last success version xxx
|
||||
// if current version == last failed version and version hash != last failed version hash, it means the version report from be is not valid
|
||||
// Case 4:
|
||||
if (this.version > this.lastFailedVersion
|
||||
|| this.version == this.lastFailedVersion && this.versionHash == this.lastFailedVersionHash
|
||||
|| this.version == this.lastFailedVersion && this.lastFailedVersionHash == 0 && this.versionHash != 0) {
|
||||
@ -242,7 +265,7 @@ public class Replica implements Writable {
|
||||
this.versionHash = this.lastSuccessVersionHash;
|
||||
}
|
||||
}
|
||||
// TODO yiguolei use info log here, there maybe a lot of logs, change it to debug when concurrent load is stable
|
||||
|
||||
LOG.debug("update {}", this.toString());
|
||||
}
|
||||
|
||||
|
||||
@ -241,6 +241,9 @@ public class Config extends ConfigBase {
|
||||
* if (current_time - t1) > 300s, then palo will treat C as a failure node
|
||||
* will call transaction manager to commit the transaction and tell transaction manager
|
||||
* that C is failed
|
||||
*
|
||||
* This is also used when waiting for publish tasks
|
||||
*
|
||||
* TODO this parameter is the default value for all job and the DBA could specify it for separate job
|
||||
*/
|
||||
@ConfField public static int load_straggler_wait_second = 300;
|
||||
|
||||
@ -46,9 +46,9 @@ import org.apache.doris.task.ClearTransactionTask;
|
||||
import org.apache.doris.task.CloneTask;
|
||||
import org.apache.doris.task.CreateReplicaTask;
|
||||
import org.apache.doris.task.CreateRollupTask;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.task.DirMoveTask;
|
||||
import org.apache.doris.task.DownloadTask;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.task.PushTask;
|
||||
import org.apache.doris.task.SchemaChangeTask;
|
||||
import org.apache.doris.task.SnapshotTask;
|
||||
@ -555,7 +555,7 @@ public class MasterImpl {
|
||||
if (request.isSetError_tablet_ids()) {
|
||||
errorTabletIds = request.getError_tablet_ids();
|
||||
}
|
||||
PublishVersionTask publishVersionTask = (PublishVersionTask)task;
|
||||
PublishVersionTask publishVersionTask = (PublishVersionTask) task;
|
||||
publishVersionTask.addErrorTablets(errorTabletIds);
|
||||
publishVersionTask.setIsFinished(true);
|
||||
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
|
||||
|
||||
@ -17,9 +17,6 @@
|
||||
|
||||
package org.apache.doris.metric;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import org.apache.doris.alter.Alter;
|
||||
import org.apache.doris.alter.AlterJob.JobType;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
@ -33,6 +30,10 @@ import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.service.ExecuteEnv;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -57,6 +58,8 @@ public final class MetricRepo {
|
||||
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
|
||||
public static LongCounterMetric COUNTER_IMAGE_WRITE;
|
||||
public static LongCounterMetric COUNTER_IMAGE_PUSH;
|
||||
public static LongCounterMetric COUNTER_TXN_FAILED;
|
||||
public static LongCounterMetric COUNTER_TXN_SUCCESS;
|
||||
public static Histogram HISTO_QUERY_LATENCY;
|
||||
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
|
||||
|
||||
@ -161,6 +164,12 @@ public final class MetricRepo {
|
||||
COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
|
||||
"counter of image succeeded in pushing to other frontends");
|
||||
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
|
||||
COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success",
|
||||
"counter of success transactions");
|
||||
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
|
||||
COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed",
|
||||
"counter of failed transactions");
|
||||
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_FAILED);
|
||||
|
||||
// 3. histogram
|
||||
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms"));
|
||||
|
||||
@ -634,7 +634,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
return result;
|
||||
}
|
||||
|
||||
// return true if commit success and publish success, return false if publish timout
|
||||
// return true if commit success and publish success, return false if publish timeout
|
||||
private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException {
|
||||
String cluster = request.getCluster();
|
||||
if (Strings.isNullOrEmpty(cluster)) {
|
||||
@ -655,6 +655,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
}
|
||||
throw new UserException("unknown database, database=" + dbName);
|
||||
}
|
||||
|
||||
return Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
|
||||
db, request.getTxnId(),
|
||||
TabletCommitInfo.fromThrift(request.getCommitInfos()),
|
||||
|
||||
@ -17,15 +17,15 @@
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.doris.thrift.TPartitionVersionInfo;
|
||||
import org.apache.doris.thrift.TPublishVersionRequest;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import org.apache.doris.thrift.TPartitionVersionInfo;
|
||||
import org.apache.doris.thrift.TPublishVersionRequest;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class PublishVersionTask extends AgentTask {
|
||||
private static final Logger LOG = LogManager.getLogger(PublishVersionTask.class);
|
||||
|
||||
@ -17,12 +17,6 @@
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.doris.alter.RollupJob;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
@ -49,6 +43,13 @@ import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -82,6 +83,7 @@ public class GlobalTransactionMgr {
|
||||
|
||||
// transactionId -> TransactionState
|
||||
private Map<Long, TransactionState> idToTransactionState;
|
||||
// db id -> (label -> txn id)
|
||||
private com.google.common.collect.Table<Long, String, Long> dbIdToTxnLabels;
|
||||
private Map<Long, Integer> runningTxnNums;
|
||||
private TransactionIdGenerator idGenerator;
|
||||
@ -107,7 +109,7 @@ public class GlobalTransactionMgr {
|
||||
throws AnalysisException, LabelAlreadyExistsException, BeginTransactionException {
|
||||
|
||||
if (Config.disable_load_job) {
|
||||
throw new BeginTransactionException("disable_load_job is set to true, all load job is prevented");
|
||||
throw new BeginTransactionException("disable_load_job is set to true, all load jobs are prevented");
|
||||
}
|
||||
|
||||
writeLock();
|
||||
@ -185,12 +187,11 @@ public class GlobalTransactionMgr {
|
||||
*/
|
||||
public void commitTransaction(long dbId, long transactionId, List<TabletCommitInfo> tabletCommitInfos)
|
||||
throws MetaNotFoundException, TransactionCommitFailedException {
|
||||
|
||||
if (Config.disable_load_job) {
|
||||
throw new TransactionCommitFailedException("disable_load_job is set to true, all load job is prevented");
|
||||
throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
|
||||
}
|
||||
|
||||
LOG.debug("try to commit transaction:[{}]", transactionId);
|
||||
LOG.debug("try to commit transaction: {}", transactionId);
|
||||
if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
|
||||
throw new TransactionCommitFailedException("all partitions have no load data");
|
||||
}
|
||||
@ -260,7 +261,8 @@ public class GlobalTransactionMgr {
|
||||
}
|
||||
// the rolling up index should also be taken care
|
||||
// if the rollup index failed during load, then set its last failed version
|
||||
// if rollup task finished, it should compare version and last failed version, if version < last failed version, then the replica is failed
|
||||
// if rollup task finished, it should compare version and last failed version,
|
||||
// if version < last failed version, then the replica is failed
|
||||
if (rollingUpIndex != null) {
|
||||
allIndices.add(rollingUpIndex);
|
||||
}
|
||||
@ -287,11 +289,12 @@ public class GlobalTransactionMgr {
|
||||
// ignore it but not log it
|
||||
// for example, a replica is in clone state
|
||||
if (replica.getLastFailedVersion() < 0) {
|
||||
++ successReplicaNum;
|
||||
++successReplicaNum;
|
||||
} else {
|
||||
// if this error replica is a base replica and it is under rollup
|
||||
// then remove the rollup task and rollup job will remove the rollup replica automatically
|
||||
// should remove here, because the error replicas not contains this base replica, but it have errors in the past
|
||||
// should remove here, because the error replicas not contains this base replica,
|
||||
// but it has errors in the past
|
||||
if (index.getId() == baseIndex.getId() && rollupJob != null) {
|
||||
LOG.info("the base replica [{}] has error, remove the related rollup replica from rollupjob [{}]",
|
||||
replica, rollupJob);
|
||||
@ -340,12 +343,16 @@ public class GlobalTransactionMgr {
|
||||
}
|
||||
// 5. persistent transactionState
|
||||
unprotectUpsertTransactionState(transactionState);
|
||||
|
||||
// add publish version tasks. set task to null as a placeholder.
|
||||
// tasks will be created when publishing version.
|
||||
for (long backendId : totalInvolvedBackends) {
|
||||
transactionState.addPublishVersionTask(backendId, null);
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
// 6. update nextVersion because of the failure of persistent transaction resulting in error version
|
||||
updateCatalogAfterCommitted(transactionState, db);
|
||||
LOG.info("transaction:[{}] successfully committed", transactionState);
|
||||
@ -385,7 +392,6 @@ public class GlobalTransactionMgr {
|
||||
}
|
||||
|
||||
public void abortTransaction(long transactionId, String reason) throws UserException {
|
||||
|
||||
if (transactionId < 0) {
|
||||
LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId);
|
||||
return;
|
||||
@ -897,12 +903,14 @@ public class GlobalTransactionMgr {
|
||||
}
|
||||
}
|
||||
partition.setNextVersion(partition.getNextVersion() + 1);
|
||||
// the partition's current version hash should be set from partition commit info
|
||||
// for example, fe master's partition current version hash is 123123, fe followers partition current version hash is 3333
|
||||
// they are different, fe master changed, the follower is master now, but its current version hash is 333, if clone happened,
|
||||
// clone finished but its finished version hash != partition's current version hash, then clone is failed
|
||||
// because clone depend on partition's current version to clone
|
||||
partition.setNextVersionHash(Util.generateVersionHash(), partitionCommitInfo.getVersionHash());
|
||||
// Although committed version(hash) is not visible to user,
|
||||
// but they need to be synchronized among Frontends.
|
||||
// because we use committed version(hash) to create clone task, if the first Master FE
|
||||
// send clone task with committed version hash X, and than Master changed, the new Master FE
|
||||
// received the clone task report with version hash X, which not equals to it own committed
|
||||
// version hash, than the clone task is failed.
|
||||
partition.setNextVersionHash(Util.generateVersionHash() /* next version hash */,
|
||||
partitionCommitInfo.getVersionHash() /* committed version hash*/);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1015,14 +1023,14 @@ public class GlobalTransactionMgr {
|
||||
if (preStatus == null
|
||||
&& (curTxnState.getTransactionStatus() == TransactionStatus.PREPARE
|
||||
|| curTxnState.getTransactionStatus() == TransactionStatus.COMMITTED)) {
|
||||
++ dbRunningTxnNum;
|
||||
++dbRunningTxnNum;
|
||||
runningTxnNums.put(curTxnState.getDbId(), dbRunningTxnNum);
|
||||
} else if (preStatus != null
|
||||
&& (preStatus == TransactionStatus.PREPARE
|
||||
|| preStatus == TransactionStatus.COMMITTED)
|
||||
&& (curTxnState.getTransactionStatus() == TransactionStatus.VISIBLE
|
||||
|| curTxnState.getTransactionStatus() == TransactionStatus.ABORTED)) {
|
||||
-- dbRunningTxnNum;
|
||||
--dbRunningTxnNum;
|
||||
if (dbRunningTxnNum < 1) {
|
||||
runningTxnNums.remove(curTxnState.getDbId());
|
||||
} else {
|
||||
|
||||
@ -17,11 +17,8 @@
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
@ -31,6 +28,10 @@ import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.thrift.TPartitionVersionInfo;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -38,14 +39,14 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PublishVersionDaemon extends Daemon {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(PublishVersionDaemon.class);
|
||||
|
||||
public PublishVersionDaemon() {
|
||||
super("PUBLISH_VERSION");
|
||||
setInterval(Config.publish_version_interval_millis);
|
||||
super("PUBLISH_VERSION", Config.publish_version_interval_millis);
|
||||
}
|
||||
|
||||
protected void runOneCycle() {
|
||||
@ -64,7 +65,7 @@ public class PublishVersionDaemon extends Daemon {
|
||||
}
|
||||
// TODO yiguolei: could publish transaction state according to multi-tenant cluster info
|
||||
// but should do more work. for example, if a table is migrate from one cluster to another cluster
|
||||
// should pulish to two clusters.
|
||||
// should publish to two clusters.
|
||||
// attention here, we publish transaction state to all backends including dead backend, if not publish to dead backend
|
||||
// then transaction manager will treat it as success
|
||||
List<Long> allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false);
|
||||
@ -97,13 +98,14 @@ public class PublishVersionDaemon extends Daemon {
|
||||
}
|
||||
}
|
||||
Set<Long> publishBackends = transactionState.getPublishVersionTasks().keySet();
|
||||
// public version tasks are not persisted in catalog, so publishBackends may be empty.
|
||||
// so we have to try publish to all backends;
|
||||
if (publishBackends.isEmpty()) {
|
||||
// could not just add to it, should new a new object, or the back map will destroyed
|
||||
publishBackends = Sets.newHashSet();
|
||||
// this is useful if fe master transfer to another master, because publish version task is not
|
||||
// persistent to edit log, then it should publish to all backends
|
||||
publishBackends.addAll(allBackends);
|
||||
}
|
||||
|
||||
for (long backendId : publishBackends) {
|
||||
PublishVersionTask task = new PublishVersionTask(backendId,
|
||||
transactionState.getTransactionId(),
|
||||
@ -130,6 +132,7 @@ public class PublishVersionDaemon extends Daemon {
|
||||
}
|
||||
Map<Long, PublishVersionTask> transTasks = transactionState.getPublishVersionTasks();
|
||||
Set<Replica> transErrorReplicas = Sets.newHashSet();
|
||||
List<PublishVersionTask> unfinishedTasks = Lists.newArrayList();
|
||||
for (PublishVersionTask publishVersionTask : transTasks.values()) {
|
||||
if (publishVersionTask.isFinished()) {
|
||||
// sometimes backend finish publish version task, but it maybe failed to change transactionid to version for some tablets
|
||||
@ -145,44 +148,48 @@ public class PublishVersionDaemon extends Daemon {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if task is not finished in time, then set all replica in the backend to error state
|
||||
List<TPartitionVersionInfo> versionInfos = publishVersionTask.getPartitionVersionInfos();
|
||||
Set<Long> errorPartitionIds = Sets.newHashSet();
|
||||
for (TPartitionVersionInfo versionInfo : versionInfos) {
|
||||
errorPartitionIds.add(versionInfo.getPartition_id());
|
||||
}
|
||||
if (errorPartitionIds.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
List<Long> tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(publishVersionTask.getBackendId());
|
||||
for (long tabletId : tabletIds) {
|
||||
long partitionId = tabletInvertedIndex.getPartitionId(tabletId);
|
||||
if (errorPartitionIds.contains(partitionId)) {
|
||||
Replica replica = tabletInvertedIndex.getReplica(tabletId, publishVersionTask.getBackendId());
|
||||
transErrorReplicas.add(replica);
|
||||
unfinishedTasks.add(publishVersionTask);
|
||||
}
|
||||
}
|
||||
|
||||
boolean shouldFinishTxn = false;
|
||||
if (!unfinishedTasks.isEmpty()) {
|
||||
if (transactionState.isPublishTimeout()) {
|
||||
// transaction's publish is timeout, but there still has unfinished tasks.
|
||||
// we need to collect all error replicas, and try to finish this txn.
|
||||
for (PublishVersionTask unfinishedTask : unfinishedTasks) {
|
||||
// set all replica in the backend to error state
|
||||
List<TPartitionVersionInfo> versionInfos = unfinishedTask.getPartitionVersionInfos();
|
||||
Set<Long> errorPartitionIds = Sets.newHashSet();
|
||||
for (TPartitionVersionInfo versionInfo : versionInfos) {
|
||||
errorPartitionIds.add(versionInfo.getPartition_id());
|
||||
}
|
||||
if (errorPartitionIds.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO(cmy): this is inefficient, but just keep it simple. will change it later.
|
||||
List<Long> tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId());
|
||||
for (long tabletId : tabletIds) {
|
||||
long partitionId = tabletInvertedIndex.getPartitionId(tabletId);
|
||||
if (errorPartitionIds.contains(partitionId)) {
|
||||
Replica replica = tabletInvertedIndex.getReplica(tabletId,
|
||||
unfinishedTask.getBackendId());
|
||||
transErrorReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shouldFinishTxn = true;
|
||||
}
|
||||
// transaction's publish is not timeout, waiting next round.
|
||||
} else {
|
||||
// all publish tasks are finished, try to finish this txn.
|
||||
shouldFinishTxn = true;
|
||||
}
|
||||
// the timeout value is related with backend num
|
||||
long timeoutMillis = Math.min(Config.publish_version_timeout_second * transTasks.size() * 1000, 10000);
|
||||
// the minimal internal should be 3s
|
||||
timeoutMillis = Math.max(timeoutMillis, 3000);
|
||||
|
||||
// should not wait clone replica or replica's that with last failed version > 0
|
||||
// if wait for them, the publish process will be very slow
|
||||
int normalReplicasNotRespond = 0;
|
||||
Set<Long> allErrorReplicas = Sets.newHashSet();
|
||||
for (Replica replica : transErrorReplicas) {
|
||||
allErrorReplicas.add(replica.getId());
|
||||
if (replica.getState() != ReplicaState.CLONE
|
||||
&& replica.getLastFailedVersion() < 1) {
|
||||
++normalReplicasNotRespond;
|
||||
}
|
||||
}
|
||||
if (normalReplicasNotRespond == 0
|
||||
|| System.currentTimeMillis() - transactionState.getPublishVersionTime() > timeoutMillis) {
|
||||
LOG.debug("transTask num {}, error replica id num {}", transTasks.size(), transErrorReplicas.size());
|
||||
if (shouldFinishTxn) {
|
||||
Set<Long> allErrorReplicas = transErrorReplicas.stream().map(v -> v.getId()).collect(Collectors.toSet());
|
||||
globalTransactionMgr.finishTransaction(transactionState.getTransactionId(), allErrorReplicas);
|
||||
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
|
||||
// if finish transaction state failed, then update publish version time, should check
|
||||
@ -192,11 +199,12 @@ public class PublishVersionDaemon extends Daemon {
|
||||
transactionState, transErrorReplicas.size());
|
||||
}
|
||||
}
|
||||
|
||||
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
|
||||
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
|
||||
}
|
||||
}
|
||||
}
|
||||
} // end for readyTransactionStates
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,8 +17,10 @@
|
||||
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
@ -253,6 +255,13 @@ public class TransactionState implements Writable {
|
||||
this.transactionStatus = transactionStatus;
|
||||
if (transactionStatus == TransactionStatus.VISIBLE) {
|
||||
this.latch.countDown();
|
||||
if (MetricRepo.isInit.get()) {
|
||||
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
|
||||
}
|
||||
} else if (transactionStatus == TransactionStatus.ABORTED) {
|
||||
if (MetricRepo.isInit.get()) {
|
||||
MetricRepo.COUNTER_TXN_FAILED.increase(1L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -321,4 +330,12 @@ public class TransactionState implements Writable {
|
||||
public Map<Long, PublishVersionTask> getPublishVersionTasks() {
|
||||
return publishVersionTasks;
|
||||
}
|
||||
|
||||
public boolean isPublishTimeout() {
|
||||
// timeout is between 3 to Config.max_txn_publish_waiting_time_ms seconds.
|
||||
long timeoutMillis = Math.min(Config.publish_version_timeout_second * publishVersionTasks.size() * 1000,
|
||||
Config.load_straggler_wait_second * 1000);
|
||||
timeoutMillis = Math.max(timeoutMillis, 3000);
|
||||
return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,32 +17,26 @@
|
||||
|
||||
package org.apache.doris.load.routineload;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Deencapsulation;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TResourceInfo;
|
||||
import org.easymock.EasyMock;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Deencapsulation;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
|
||||
public class RoutineLoadSchedulerTest {
|
||||
|
||||
@Test
|
||||
@ -72,18 +66,6 @@ public class RoutineLoadSchedulerTest {
|
||||
Deencapsulation.setField(routineLoadJob, "kafkaPartitions", partitions);
|
||||
Deencapsulation.setField(routineLoadJob, "desireTaskConcurrentNum", 3);
|
||||
|
||||
new MockUp<Catalog>() {
|
||||
@Mock
|
||||
public SystemInfoService getCurrentSystemInfo() {
|
||||
return systemInfoService;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public Catalog getCurrentCatalog() {
|
||||
return catalog;
|
||||
}
|
||||
};
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getRoutineLoadInstance();
|
||||
|
||||
Reference in New Issue
Block a user