[fix](transaction) fix publish txn fake succ (#24273)

This commit is contained in:
yujun
2023-09-14 21:04:59 +08:00
committed by GitHub
parent 0c30720c99
commit d20365cdcf
13 changed files with 174 additions and 204 deletions

View File

@ -1525,17 +1525,18 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
DorisMetrics::instance()->publish_task_request_total->increment(1);
VLOG_NOTICE << "get publish version task. signature=" << agent_task_req.signature;
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
std::set<TTabletId> error_tablet_ids;
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
succ_tablets.clear();
error_tablet_ids.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablet_ids, &discontinuous_version_tablets);
&succ_tablets, &discontinuous_version_tablets);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
@ -1584,25 +1585,22 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
.tag("transaction_id", publish_version_req.transaction_id)
.tag("error_tablets_num", error_tablet_ids.size())
.error(status);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
} else {
if (!config::disable_auto_compaction &&
!MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
for (int i = 0; i < succ_tablet_ids.size(); i++) {
for (auto [tablet_id, _] : succ_tablets) {
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
succ_tablet_ids[i]);
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet != nullptr) {
tablet->publised_count++;
if (tablet->publised_count % 10 == 0) {
StorageEngine::instance()->submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
LOG(INFO) << "trigger compaction succ, tabletid:" << succ_tablet_ids[i]
LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
<< ", publised:" << tablet->publised_count;
}
} else {
LOG(WARNING)
<< "trigger compaction failed, tabletid:" << succ_tablet_ids[i];
LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id;
}
}
}
@ -1611,7 +1609,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
LOG_INFO("successfully publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
.tag("tablets_num", succ_tablet_ids.size())
.tag("tablets_num", succ_tablets.size())
.tag("cost(s)", cost_second);
}
@ -1620,7 +1618,9 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_report_version(_s_report_version);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);

View File

@ -306,6 +306,7 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::INVERTED_INDEX_BUILD_WAITTING
&& code != ErrorCode::META_KEY_NOT_FOUND
&& code != ErrorCode::PUSH_VERSION_ALREADY_EXIST
&& code != ErrorCode::VERSION_NOT_EXIST
&& code != ErrorCode::TABLE_ALREADY_DELETED_ERROR
&& code != ErrorCode::TRANSACTION_NOT_EXIST
&& code != ErrorCode::TRANSACTION_ALREADY_VISIBLE

View File

@ -490,7 +490,7 @@ Status DataDir::load() {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed = pending_publish_info_pb.ParseFromString(info);
if (!parsed) {
LOG(WARNING) << "parse pending publish info failed, tablt_id: " << tablet_id
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
<< " publish_version: " << publish_version;
}
StorageEngine::instance()->add_async_publish_task(

View File

@ -69,22 +69,17 @@ void TabletPublishStatistics::record_in_bvar() {
}
EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids,
const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids,
std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets) {}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
_error_tablet_ids->push_back(tablet_id);
}
void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
_succ_tablet_ids->push_back(tablet_id);
_error_tablet_ids->insert(tablet_id);
}
Status EnginePublishVersionTask::finish() {
@ -126,7 +121,7 @@ Status EnginePublishVersionTask::finish() {
// and receive fe's publish version task
// this be must return as an error tablet
if (rowset == nullptr) {
_error_tablet_ids->push_back(tablet_info.tablet_id);
add_error_tablet_id(tablet_info.tablet_id);
res = Status::Error<PUSH_ROWSET_NOT_FOUND>(
"could not find related rowset for tablet {}, txn id {}",
tablet_info.tablet_id, transaction_id);
@ -135,7 +130,7 @@ Status EnginePublishVersionTask::finish() {
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(
tablet_info.tablet_id, tablet_info.tablet_uid);
if (tablet == nullptr) {
_error_tablet_ids->push_back(tablet_info.tablet_id);
add_error_tablet_id(tablet_info.tablet_id);
res = Status::Error<PUSH_TABLE_NOT_EXIST>(
"can't get tablet when publish version. tablet_id={}",
tablet_info.tablet_id);
@ -199,6 +194,7 @@ Status EnginePublishVersionTask::finish() {
}
token->wait();
_succ_tablets->clear();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
@ -209,18 +205,36 @@ Status EnginePublishVersionTask::finish() {
Version version(par_ver_info.version, par_ver_info.version);
for (auto& tablet_info : partition_related_tablet_infos) {
// has to use strict mode to check if check all tablets
if (!_publish_version_req.strict_mode) {
break;
}
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id);
auto tablet_id = tablet_info.tablet_id;
if (tablet == nullptr) {
add_error_tablet_id(tablet_info.tablet_id);
add_error_tablet_id(tablet_id);
_succ_tablets->erase(tablet_id);
LOG(WARNING) << "publish version failed on transaction, not found tablet. "
<< "transaction_id=" << transaction_id << ", tablet_id=" << tablet_id
<< ", version=" << par_ver_info.version;
} else {
// check if the version exist, if not exist, then set publish failed
if (!tablet->check_version_exist(version)) {
add_error_tablet_id(tablet_info.tablet_id);
if (_error_tablet_ids->find(tablet_id) == _error_tablet_ids->end()) {
if (tablet->check_version_exist(version)) {
// it's better to report the max continous succ version,
// but it maybe time cost now.
// current just report 0
(*_succ_tablets)[tablet_id] = 0;
} else {
add_error_tablet_id(tablet_id);
if (res.ok()) {
res = Status::Error<VERSION_NOT_EXIST>(
"tablet {} not exists version {}", tablet_id,
par_ver_info.version);
}
LOG(WARNING) << "publish version failed on transaction, tablet version not "
"exists. "
<< "transaction_id=" << transaction_id
<< ", tablet_id=" << tablet_id
<< ", version=" << par_ver_info.version;
}
}
}
}
@ -280,9 +294,7 @@ void TabletPublishTxnTask::handle() {
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
_engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
// print stats if publish cost > 500ms
g_tablet_publish_latency << cost_us;
_stats.record_in_bvar();
LOG(INFO) << "publish version successfully on tablet"

View File

@ -23,7 +23,9 @@
#include <atomic>
#include <condition_variable>
#include <map>
#include <mutex>
#include <set>
#include <vector>
#include "common/status.h"
@ -83,23 +85,22 @@ private:
class EnginePublishVersionTask : public EngineTask {
public:
EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids,
const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets);
~EnginePublishVersionTask() {}
virtual Status finish() override;
void add_error_tablet_id(int64_t tablet_id);
void add_succ_tablet_id(int64_t tablet_id);
int64_t finish_task();
private:
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
std::set<TTabletId>* _error_tablet_ids;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
};

View File

@ -67,6 +67,7 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MasterImpl {
@ -465,6 +466,10 @@ public class MasterImpl {
}
private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
Map<Long, Long> succTablets = null;
if (request.isSetSuccTablets()) {
succTablets = request.getSuccTablets();
}
List<Long> errorTabletIds = null;
if (request.isSetErrorTabletIds()) {
errorTabletIds = request.getErrorTabletIds();
@ -478,6 +483,7 @@ public class MasterImpl {
}
PublishVersionTask publishVersionTask = (PublishVersionTask) task;
publishVersionTask.setSuccTablets(succTablets);
publishVersionTask.addErrorTablets(errorTabletIds);
publishVersionTask.setFinished(true);

View File

@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class PublishVersionTask extends AgentTask {
private static final Logger LOG = LogManager.getLogger(PublishVersionTask.class);
@ -34,11 +35,15 @@ public class PublishVersionTask extends AgentTask {
private List<TPartitionVersionInfo> partitionVersionInfos;
private List<Long> errorTablets;
// tabletId => version, current version = 0
private Map<Long, Long> succTablets;
public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos, long createTime) {
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime);
this.transactionId = transactionId;
this.partitionVersionInfos = partitionVersionInfos;
this.succTablets = null;
this.errorTablets = new ArrayList<Long>();
this.isFinished = false;
}
@ -57,6 +62,14 @@ public class PublishVersionTask extends AgentTask {
return partitionVersionInfos;
}
public Map<Long, Long> getSuccTablets() {
return succTablets;
}
public void setSuccTablets(Map<Long, Long> succTablets) {
this.succTablets = succTablets;
}
public synchronized List<Long> getErrorTablets() {
return errorTablets;
}

View File

@ -54,6 +54,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.ClearTransactionTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.annotations.VisibleForTesting;
@ -868,7 +869,7 @@ public class DatabaseTransactionMgr {
}
}
public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) throws UserException {
public void finishTransaction(long transactionId) throws UserException {
TransactionState transactionState = null;
readLock();
try {
@ -876,14 +877,10 @@ public class DatabaseTransactionMgr {
} finally {
readUnlock();
}
// add all commit errors and publish errors to a single set
if (errorReplicaIds == null) {
errorReplicaIds = Sets.newHashSet();
}
Set<Long> originalErrorReplicas = transactionState.getErrorReplicas();
if (originalErrorReplicas != null) {
errorReplicaIds.addAll(originalErrorReplicas);
}
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
long now = System.currentTimeMillis();
long firstPublishOneSuccTime = transactionState.getFirstPublishOneSuccTime();
@ -980,21 +977,10 @@ public class DatabaseTransactionMgr {
tabletWriteFailedReplicas.clear();
tabletVersionFailedReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
tabletSuccReplicas.add(replica);
} else {
tabletVersionFailedReplicas.add(replica);
}
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
// the replica's version is larger than or equal to current transaction
// partition's version the replica is normal, then remove it from error replica ids
// TODO(cmy): actually I have no idea why we need this check
tabletSuccReplicas.add(replica);
errorReplicaIds.remove(replica.getId());
} else {
tabletWriteFailedReplicas.add(replica);
}
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()),
errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
}
int healthReplicaNum = tabletSuccReplicas.size();
@ -1005,7 +991,7 @@ public class DatabaseTransactionMgr {
LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
transactionState, tablet, partitionCommitInfo.getVersion(),
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId, partitionId, writeDetail);
}
continue;
@ -1033,8 +1019,8 @@ public class DatabaseTransactionMgr {
LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum,
tableId, partitionId, writeDetail);
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId, partitionId, writeDetail);
} else {
publishResult = PublishResult.FAILED;
String errMsg = String.format("publish on tablet %d failed."
@ -1046,8 +1032,8 @@ public class DatabaseTransactionMgr {
LOG.info("publish version failed for transaction {} on tablet {} with version"
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
+ " tablet detail: {}",
transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum,
tableId, partitionId, writeDetail);
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
quorumReplicaNum, tableId, partitionId, writeDetail);
}
}
}
@ -1093,6 +1079,43 @@ public class DatabaseTransactionMgr {
LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
}
private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, long version,
PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds, List<Replica> tabletSuccReplicas,
List<Replica> tabletWriteFailedReplicas, List<Replica> tabletVersionFailedReplicas) {
if (backendPublishTask == null || !backendPublishTask.isFinished()) {
errorReplicaIds.add(replica.getId());
} else {
Map<Long, Long> backendSuccTablets = backendPublishTask.getSuccTablets();
// new doris BE will report succ tablets
if (backendSuccTablets != null) {
if (backendSuccTablets.containsKey(tabletId)) {
errorReplicaIds.remove(replica.getId());
} else {
errorReplicaIds.add(replica.getId());
}
} else {
// for compatibility, old doris BE report only error tablets
List<Long> backendErrorTablets = backendPublishTask.getErrorTablets();
if (backendErrorTablets != null && backendErrorTablets.contains(tabletId)) {
errorReplicaIds.add(replica.getId());
}
}
}
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.checkVersionCatchUp(version - 1, true)) {
tabletSuccReplicas.add(replica);
} else {
tabletVersionFailedReplicas.add(replica);
}
} else if (replica.getVersion() >= version) {
tabletSuccReplicas.add(replica);
errorReplicaIds.remove(replica.getId());
} else {
tabletWriteFailedReplicas.add(replica);
}
}
protected void unprotectedPreCommitTransaction2PC(TransactionState transactionState, Set<Long> errorReplicaIds,
Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
Database db) {

View File

@ -402,13 +402,13 @@ public class GlobalTransactionMgr implements Writable {
/**
* if the table is deleted between commit and publish version, then should ignore the partition
*
* @param dbId
* @param transactionId
* @param errorReplicaIds
* @return
*/
public void finishTransaction(long dbId, long transactionId, Set<Long> errorReplicaIds) throws UserException {
public void finishTransaction(long dbId, long transactionId) throws UserException {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
dbTransactionMgr.finishTransaction(transactionId, errorReplicaIds);
dbTransactionMgr.finishTransaction(transactionId);
}
/**

View File

@ -17,18 +17,11 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
@ -36,14 +29,12 @@ import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class PublishVersionDaemon extends MasterDaemon {
@ -63,15 +54,6 @@ public class PublishVersionDaemon extends MasterDaemon {
}
}
private boolean isAllBackendsOfUnfinishedTasksDead(List<PublishVersionTask> unfinishedTasks) {
for (PublishVersionTask unfinishedTask : unfinishedTasks) {
if (Env.getCurrentSystemInfo().checkBackendAlive(unfinishedTask.getBackendId())) {
return false;
}
}
return true;
}
private void publishVersion() {
GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
List<TransactionState> readyTransactionStates = globalTransactionMgr.getReadyToPublishTransactions();
@ -81,7 +63,8 @@ public class PublishVersionDaemon extends MasterDaemon {
// ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend
// then transaction manager will treat it as success
List<Long> allBackends = Env.getCurrentSystemInfo().getAllBackendIds(false);
SystemInfoService infoService = Env.getCurrentSystemInfo();
List<Long> allBackends = infoService.getAllBackendIds(false);
if (allBackends.isEmpty()) {
LOG.warn("some transaction state need to publish, but no backend exists");
return;
@ -138,109 +121,17 @@ public class PublishVersionDaemon extends MasterDaemon {
AgentTaskExecutor.submit(batchTask);
}
TabletInvertedIndex tabletInvertedIndex = Env.getCurrentInvertedIndex();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
Map<Long, PublishVersionTask> transTasks = transactionState.getPublishVersionTasks();
Set<Long> publishErrorReplicaIds = Sets.newHashSet();
List<PublishVersionTask> unfinishedTasks = Lists.newArrayList();
for (PublishVersionTask publishVersionTask : transTasks.values()) {
if (publishVersionTask.isFinished()) {
// sometimes backend finish publish version task,
// but it maybe failed to change transactionid to version for some tablets
// and it will upload the failed tabletinfo to fe and fe will deal with them
List<Long> errorTablets = publishVersionTask.getErrorTablets();
if (errorTablets == null || errorTablets.isEmpty()) {
continue;
} else {
for (long tabletId : errorTablets) {
// tablet inverted index also contains rollingup index
// if tablet meta not contains the tablet, skip this tablet because this tablet is dropped
// from fe
if (tabletInvertedIndex.getTabletMeta(tabletId) == null) {
continue;
}
Replica replica = tabletInvertedIndex.getReplica(
tabletId, publishVersionTask.getBackendId());
if (replica != null) {
publishErrorReplicaIds.add(replica.getId());
} else {
LOG.info("could not find related replica with tabletid={}, backendid={}",
tabletId, publishVersionTask.getBackendId());
}
}
}
} else {
unfinishedTasks.add(publishVersionTask);
}
}
boolean shouldFinishTxn = false;
if (!unfinishedTasks.isEmpty()) {
shouldFinishTxn = isAllBackendsOfUnfinishedTasksDead(unfinishedTasks);
if (transactionState.isPublishTimeout() || shouldFinishTxn) {
// transaction's publish is timeout, but there still has unfinished tasks.
// we need to collect all error replicas, and try to finish this txn.
for (PublishVersionTask unfinishedTask : unfinishedTasks) {
// set all replicas in the backend to error state
List<TPartitionVersionInfo> versionInfos = unfinishedTask.getPartitionVersionInfos();
Set<Long> errorPartitionIds = Sets.newHashSet();
for (TPartitionVersionInfo versionInfo : versionInfos) {
errorPartitionIds.add(versionInfo.getPartitionId());
}
if (errorPartitionIds.isEmpty()) {
continue;
}
Database db = Env.getCurrentInternalCatalog()
.getDbNullable(transactionState.getDbId());
if (db == null) {
LOG.warn("Database [{}] has been dropped.", transactionState.getDbId());
continue;
}
for (long tableId : transactionState.getTableIdList()) {
Table table = db.getTableNullable(tableId);
if (table == null || table.getType() != Table.TableType.OLAP) {
LOG.warn("Table [{}] in database [{}] has been dropped.", tableId, db.getFullName());
continue;
}
OlapTable olapTable = (OlapTable) table;
olapTable.readLock();
try {
for (Long errorPartitionId : errorPartitionIds) {
Partition partition = olapTable.getPartition(errorPartitionId);
if (partition != null) {
List<MaterializedIndex> materializedIndexList
= partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
for (MaterializedIndex materializedIndex : materializedIndexList) {
for (Tablet tablet : materializedIndex.getTablets()) {
Replica replica = tablet.getReplicaByBackendId(
unfinishedTask.getBackendId());
if (replica != null) {
publishErrorReplicaIds.add(replica.getId());
}
}
}
}
}
} finally {
olapTable.readUnlock();
}
}
}
shouldFinishTxn = true;
}
} else {
// all publish tasks are finished, try to finish this txn.
shouldFinishTxn = true;
}
boolean hasBackendAliveAndUnfinishTask = transactionState.getPublishVersionTasks().values().stream()
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || transactionState.isPublishTimeout();
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), publishErrorReplicaIds);
transactionState.getTransactionId());
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
@ -248,8 +139,7 @@ public class PublishVersionDaemon extends MasterDaemon {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
LOG.debug("publish version for transaction {} failed, has {} error replicas during publish",
transactionState, publishErrorReplicaIds.size());
LOG.debug("publish version for transaction {} failed", transactionState);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.task.PublishVersionTask;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -56,7 +57,17 @@ public class DatabaseTransactionMgrTest {
private static Env slaveEnv;
private static Map<String, Long> LabelToTxnId;
private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
private TransactionState.TxnCoordinator transactionSource =
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe");
public static void setTransactionFinishPublish(TransactionState transactionState, List<Long> backendIds) {
for (long backendId : backendIds) {
PublishVersionTask task = new PublishVersionTask(backendId, transactionState.getTransactionId(),
transactionState.getDbId(), null, System.currentTimeMillis());
task.setFinished(true);
transactionState.addPublishVersionTask(backendId, task);
}
}
@Before
public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException,
@ -100,7 +111,11 @@ public class DatabaseTransactionMgrTest {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets);
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3));
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1);
labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1);
TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1");
@ -120,8 +135,6 @@ public class DatabaseTransactionMgrTest {
labelToTxnId.put(CatalogTestUtil.testTxnLabel3, transactionId3);
labelToTxnId.put(CatalogTestUtil.testTxnLabel4, transactionId4);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
FakeEnv.setEnv(slaveEnv);
slaveTransMgr.replayUpsertTransactionState(transactionState1);
return labelToTxnId;

View File

@ -55,7 +55,6 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -66,10 +65,8 @@ import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class GlobalTransactionMgrTest {
private static FakeEditLog fakeEditLog;
@ -467,9 +464,12 @@ public class GlobalTransactionMgrTest {
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
slaveTransMgr.replayUpsertTransactionState(transactionState);
Set<Long> errorReplicaIds = Sets.newHashSet();
errorReplicaIds.add(CatalogTestUtil.testReplicaId1);
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, errorReplicaIds);
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3));
transactionState.getPublishVersionTasks()
.get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1);
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId);
transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
// check replica version
@ -524,9 +524,13 @@ public class GlobalTransactionMgrTest {
// master finish the transaction failed
FakeEnv.setEnv(masterEnv);
Set<Long> errorReplicaIds = Sets.newHashSet();
errorReplicaIds.add(CatalogTestUtil.testReplicaId2);
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, errorReplicaIds);
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2));
// backend2 publish failed
transactionState.getPublishVersionTasks()
.get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1);
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
@ -540,8 +544,12 @@ public class GlobalTransactionMgrTest {
Assert.assertEquals(-1, replica2.getLastFailedVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica3.getLastFailedVersion());
errorReplicaIds = Sets.newHashSet();
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, errorReplicaIds);
// backend2 publish success
Map<Long, Long> backend2SuccTablets = Maps.newHashMap();
backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L);
transactionState.getPublishVersionTasks()
.get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets);
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId);
Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica2.getVersion());
@ -603,8 +611,10 @@ public class GlobalTransactionMgrTest {
Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv));
// master finish the transaction2
errorReplicaIds = Sets.newHashSet();
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2, errorReplicaIds);
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3));
masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2);
Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica1.getVersion());
Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica2.getVersion());

View File

@ -65,6 +65,7 @@ struct TFinishTaskRequest {
14: optional list<Types.TTabletId> downloaded_tablet_ids
15: optional i64 copy_size
16: optional i64 copy_time_ms
17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
}
struct TTablet {