diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index ab588f2e03..7faf3439bf 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -246,14 +246,14 @@ bool TaskWorkerPool::_record_task_info( EnumToString(TTaskType, task_type, task_name); if (signature_set.count(signature) > 0) { LOG(INFO) << "type: " << task_name - << ", signature: " << signature << ", already exist." - << ", queue size: " << signature_set.size(); + << ", signature: " << signature << ", already exist" + << ". queue size: " << signature_set.size(); ret = false; } else { signature_set.insert(signature); LOG(INFO) << "type: " << task_name - << ", signature: " << signature << ", has been inserted." - << ", queue size: " << signature_set.size(); + << ", signature: " << signature << ", has been inserted" + << ". queue size: " << signature_set.size(); if (task_type == TTaskType::PUSH) { _s_total_task_user_count[task_type][user] += 1; _s_total_task_count[task_type] += 1; @@ -284,7 +284,7 @@ void TaskWorkerPool::_remove_task_info( std::string task_name; EnumToString(TTaskType, task_type, task_name); LOG(INFO) << "type: " << task_name - << ", signature: " << signature << ", has been erased." + << ", signature: " << signature << ", has been erased" << ", queue size: " << signature_set.size(); } diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 81fd1e28c5..85372458e9 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -190,7 +190,7 @@ OLAPStatus DeltaWriter::close(google::protobuf::RepeatedPtrField* t RETURN_NOT_OK(segment_group->load()); } if (_new_table != nullptr) { - LOG(INFO) << "convert version for schema change"; + LOG(INFO) << "convert version for schema change. txn id: " << _req.transaction_id; { MutexLock push_lock(_new_table->get_push_lock()); // create pending data dir diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index e0dd54b8e2..60067e1bba 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -2470,8 +2470,8 @@ AlterTableStatus OLAPEngine::show_alter_table_status( TTabletId tablet_id, TSchemaHash schema_hash) { LOG(INFO) << "begin to process show alter table status." - << "tablet_id" << tablet_id - << ", schema_hash" << schema_hash; + << "tablet_id=" << tablet_id + << ", schema_hash=" << schema_hash; AlterTableStatus status = ALTER_TABLE_FINISHED; diff --git a/be/src/olap/olap_table.h b/be/src/olap/olap_table.h index ae8d31a4d2..eda2e0d214 100644 --- a/be/src/olap/olap_table.h +++ b/be/src/olap/olap_table.h @@ -565,6 +565,7 @@ public: // 在使用之前对header加锁 void set_cumulative_layer_point(const int32_t new_point) { + LOG(INFO) << "cumulative_layer_point: " << new_point; _header->set_cumulative_layer_point(new_point); } diff --git a/fe/src/main/java/org/apache/doris/alter/AlterJob.java b/fe/src/main/java/org/apache/doris/alter/AlterJob.java index 66ba84926a..5819fdf2cb 100644 --- a/fe/src/main/java/org/apache/doris/alter/AlterJob.java +++ b/fe/src/main/java/org/apache/doris/alter/AlterJob.java @@ -188,7 +188,8 @@ public abstract class AlterJob implements Writable { return false; } else if (!backend.isAlive()) { long currentTime = System.currentTimeMillis(); - if (currentTime - backend.getLastUpdateMs() > Config.max_backend_down_time_second * 1000) { + if (backend.getLastUpdateMs() > 0 + && currentTime - backend.getLastUpdateMs() > Config.max_backend_down_time_second * 1000) { // this backend is done for a long time and not restart automatically. // we consider it as dead return false; diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 92762f9eeb..d70994da38 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1037,14 +1037,13 @@ public class SchemaChangeHandler extends AlterHandler { // set replica state for (Tablet tablet : alterIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - // has to check last failed version here - // if the replica has version 1,2,3,5,6 not has 4 - // then fe will send schema change job to it and it will finish with missing 4 if (replica.getState() == ReplicaState.CLONE || replica.getLastFailedVersion() > 0) { - // just skip it (replica cloned from old schema will be deleted) + // this should not happen, cause we only allow schema change when table is stable. + LOG.error("replica {} of tablet {} on backend {} is not NORMAL: {}", + replica.getId(), tablet.getId(), replica.getBackendId(), replica); continue; } - Preconditions.checkState(replica.getState() == ReplicaState.NORMAL); + Preconditions.checkState(replica.getState() == ReplicaState.NORMAL, replica.getState()); replica.setState(ReplicaState.SCHEMA_CHANGE); } // end for replicas } // end for tablets diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index e890edf421..2e02c3fb4b 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; @@ -703,9 +704,11 @@ public class SchemaChangeJob extends AlterJob { continue; } - if (replica.getLastFailedVersion() > 0) { - LOG.warn("replica {} of tablet {} last failed version > 0, set it as bad", - replica, tablet.getId()); + if (replica.getLastFailedVersion() > 0 && System.currentTimeMillis() + - replica.getLastFailedTimestamp() > Config.max_backend_down_time_second + * 1000) { + LOG.warn("replica {} of tablet {} last failed version > 0, " + + "and last for an hour, set it as bad", replica, tablet.getId()); --healthNum; continue; } diff --git a/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java b/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java index a1154716e5..d33d961e64 100644 --- a/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/AbstractBackupStmt.java @@ -66,9 +66,12 @@ public class AbstractBackupStmt extends DdlStmt { public void analyze(Analyzer analyzer) throws AnalysisException, UserException { labelName.analyze(analyzer); - // check auth - if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + // user need database level priv(not table level), because when doing restore operation, + // the restore table may be newly created, so we can not judge its privs. + if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), + labelName.getDbName(), PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED, + ConnectContext.get().getQualifiedUser(), labelName.getDbName()); } checkAndNormalizeBackupObjs(); diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java index 74f5786110..3083580cf9 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; @@ -70,7 +69,7 @@ public class ShowTabletStmt extends ShowStmt { } @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); if (!isShowSingleTablet && Strings.isNullOrEmpty(dbName)) { dbName = analyzer.getDefaultDb(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index fd7c88cd14..4b53877cd1 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4203,7 +4203,7 @@ public class Catalog { AgentTaskQueue.removeReplicaRelatedTasks(backendId, tabletId); } - public void unprotectAddReplica(ReplicaPersistInfo info) { + private void unprotectAddReplica(ReplicaPersistInfo info) { LOG.debug("replay add a replica {}", info); Database db = getDb(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTable(info.getTableId()); @@ -4227,6 +4227,18 @@ public class Catalog { tablet.addReplica(replica); } + private void unprotectUpdateReplica(ReplicaPersistInfo info) { + LOG.debug("replay update a replica {}", info); + Database db = getDb(info.getDbId()); + OlapTable olapTable = (OlapTable) db.getTable(info.getTableId()); + Partition partition = olapTable.getPartition(info.getPartitionId()); + MaterializedIndex materializedIndex = partition.getIndex(info.getIndexId()); + Tablet tablet = materializedIndex.getTablet(info.getTabletId()); + Replica replica = tablet.getReplicaByBackendId(info.getBackendId()); + Preconditions.checkNotNull(replica, info); + replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), info.getDataSize(), info.getRowCount()); + } + public void replayAddReplica(ReplicaPersistInfo info) { Database db = getDb(info.getDbId()); db.writeLock(); @@ -4237,6 +4249,16 @@ public class Catalog { } } + public void replayUpdateReplica(ReplicaPersistInfo info) { + Database db = getDb(info.getDbId()); + db.writeLock(); + try { + unprotectUpdateReplica(info); + } finally { + db.writeUnlock(); + } + } + public void unprotectDeleteReplica(ReplicaPersistInfo info) { Database db = getDb(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTable(info.getTableId()); @@ -4536,7 +4558,7 @@ public class Catalog { } public Pair getHelperNode() { - Preconditions.checkState(helperNodes.size() == 1); + Preconditions.checkState(helperNodes.size() >= 1); return this.helperNodes.get(0); } diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index f65267f54f..23c80dc7c0 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -266,7 +266,7 @@ public class Replica implements Writable { // TODO: this case is unknown, add log to observe if (this.version > lastFailedVersion && lastFailedVersion > 0) { - LOG.info("current version {} is larger than last failed version {} , " + LOG.debug("current version {} is larger than last failed version {}, " + "last failed version hash {}, maybe a fatal error or be report version, print a stack here ", this.version, lastFailedVersion, lastFailedVersionHash, new Exception()); } @@ -371,8 +371,10 @@ public class Replica implements Writable { strBuffer.append(lastSuccessVersionHash); strBuffer.append(", lastFailedTimestamp="); strBuffer.append(lastFailedTimestamp); - strBuffer.append(", schemaHash"); + strBuffer.append(", schemaHash="); strBuffer.append(schemaHash); + strBuffer.append(", state="); + strBuffer.append(state.name()); strBuffer.append("]"); return strBuffer.toString(); } @@ -391,7 +393,6 @@ public class Replica implements Writable { out.writeLong(lastFailedVersionHash); out.writeLong(lastSuccessVersion); out.writeLong(lastSuccessVersionHash); - } @Override diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 5a02a35bba..a6b7868282 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -108,7 +108,6 @@ public class TabletInvertedIndex { ListMultimap transactionsToPublish, ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap) { - long start = 0L; readLock(); try { @@ -129,7 +128,7 @@ public class TabletInvertedIndex { if (tabletMeta.containsSchemaHash(backendTabletInfo.getSchema_hash())) { foundTabletsWithValidSchema.add(tabletId); // 1. (intersection) - if (checkSync(replica, backendTabletInfo.getVersion(), + if (needSync(replica, backendTabletInfo.getVersion(), backendTabletInfo.getVersion_hash())) { // need sync tabletSyncMap.put(tabletMeta.getDbId(), tabletId); @@ -142,13 +141,15 @@ public class TabletInvertedIndex { replica.setPathHash(backendTabletInfo.getPath_hash()); } - if (checkNeedRecover(replica, backendTabletInfo.getVersion(), + if (needRecover(replica, tabletMeta.getOldSchemaHash(), + backendTabletInfo.getSchema_hash(), backendTabletInfo.getVersion(), backendTabletInfo.getVersion_hash())) { LOG.warn("replica {} of tablet {} on backend {} need recovery. " - + "replica in FE: {}, report version {}-{}", + + "replica in FE: {}, report version {}-{}, report schema hash: {}", replica.getId(), tabletId, backendId, replica, backendTabletInfo.getVersion(), - backendTabletInfo.getVersion_hash()); + backendTabletInfo.getVersion_hash(), + backendTabletInfo.getSchema_hash()); tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); } @@ -310,10 +311,10 @@ public class TabletInvertedIndex { return backendIdToReplica.keySet(); } - private boolean checkSync(Replica replicaMeta, long backendVersion, long backendVersionHash) { - long metaVersion = replicaMeta.getVersion(); - long metaVersionHash = replicaMeta.getVersionHash(); - if (metaVersion < backendVersion || (metaVersion == backendVersion && metaVersionHash != backendVersionHash)) { + private boolean needSync(Replica replicaInFe, long backendVersion, long backendVersionHash) { + long versionInFe = replicaInFe.getVersion(); + long versionHashInFe = replicaInFe.getVersionHash(); + if (backendVersion > versionInFe || (versionInFe == backendVersion && versionHashInFe != backendVersionHash)) { return true; } return false; @@ -323,7 +324,13 @@ public class TabletInvertedIndex { * if be's report version < fe's meta version, it means some version is missing in BE * because of some unrecoverable failure. */ - private boolean checkNeedRecover(Replica replicaInFe, long backendVersion, long backendVersionHash) { + private boolean needRecover(Replica replicaInFe, int schemaHashInFe, int schemaHashInBe, + long backendVersion, long backendVersionHash) { + if (schemaHashInFe != schemaHashInBe || backendVersion == -1 && backendVersionHash == 0) { + // no data file exist on BE, maybe this is a newly created schema change tablet. no need to recovery + return false; + } + if (replicaInFe.getVersionHash() == 0 && backendVersion == replicaInFe.getVersion() - 1) { /* * This is very tricky: diff --git a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java index d16faab3ee..4924eef5b5 100644 --- a/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java +++ b/fe/src/main/java/org/apache/doris/clone/LoadBalancer.java @@ -228,7 +228,7 @@ public class LoadBalancer { } } if (!setSource) { - throw new SchedException(Status.UNRECOVERABLE, "no replica in high load backend"); + throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot"); } // Select a low load backend as destination. diff --git a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java index 87517493c2..7516d547e3 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletChecker.java @@ -175,6 +175,8 @@ public class TabletChecker extends Daemon { long totalTabletNum = 0; long unhealthyTabletNum = 0; long addToSchedulerTabletNum = 0; + long tabletInScheduler = 0; + long tabletNotReady = 0; List dbIds = catalog.getDbIds(); OUT: for (Long dbId : dbIds) { @@ -203,6 +205,7 @@ public class TabletChecker extends Daemon { totalTabletNum++; if (tabletScheduler.containsTablet(tablet.getId())) { + tabletInScheduler++; continue; } @@ -225,6 +228,7 @@ public class TabletChecker extends Daemon { unhealthyTabletNum++; if (!tablet.readyToBeRepaired(statusWithPrio.second)) { + tabletNotReady++; continue; } @@ -268,8 +272,8 @@ public class TabletChecker extends Daemon { stat.counterUnhealthyTabletNum.addAndGet(unhealthyTabletNum); stat.counterTabletAddToBeScheduled.addAndGet(addToSchedulerTabletNum); - LOG.info("finished to check tablets. unhealth/total/added: {}/{}/{}, cost: {} ms", - unhealthyTabletNum, totalTabletNum, addToSchedulerTabletNum, cost); + LOG.info("finished to check tablets. unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms", + unhealthyTabletNum, totalTabletNum, addToSchedulerTabletNum, tabletInScheduler, tabletNotReady, cost); } private boolean isInPrios(long dbId, long tblId, long partId) { diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index a444c18aa5..303bcda2ee 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -126,7 +126,8 @@ public class TabletSchedCtx implements Comparable { RUNNING, // tablet is being scheduled FINISHED, // task is finished CANCELLED, // task is failed - TIMEOUT // task is timeout + TIMEOUT, // task is timeout + UNEXPECTED // other unexpected errors } private Type type; @@ -601,8 +602,10 @@ public class TabletSchedCtx implements Comparable { visibleVersion, visibleVersionHash); cloneTask.setPathHash(srcPathHash, destPathHash); - if (tabletStatus == TabletStatus.REPLICA_MISSING || tabletStatus == TabletStatus.REPLICA_MISSING_IN_CLUSTER) { - // only these 2 status need to create a new replica. + // if this is a balance task, or this is a repair task with REPLICA_MISSING or REPLICA_MISSING_IN_CLUSTER, + // we create a new replica with state CLONE + if (tabletStatus == TabletStatus.REPLICA_MISSING || tabletStatus == TabletStatus.REPLICA_MISSING_IN_CLUSTER + || type == Type.BALANCE) { Replica cloneReplica = new Replica( Catalog.getCurrentCatalog().getNextId(), destBackendId, -1 /* version */, 0 /* version hash */, schemaHash, @@ -614,6 +617,7 @@ public class TabletSchedCtx implements Comparable { // addReplica() method will add this replica to tablet inverted index too. tablet.addReplica(cloneReplica); } else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) { + Preconditions.checkState(type == Type.REPAIR, type); // double check Replica replica = tablet.getReplicaByBackendId(destBackendId); if (replica == null) { @@ -660,6 +664,10 @@ public class TabletSchedCtx implements Comparable { throw new SchedException(Status.RUNNING_FAILED, request.getTask_status().getError_msgs().get(0)); } + if (!request.isSetFinish_tablet_infos() || request.getFinish_tablet_infos().isEmpty()) { + throw new SchedException(Status.RUNNING_FAILED, "tablet info is not set in task report request"); + } + // check task report if (dbId != cloneTask.getDbId() || tblId != cloneTask.getTableId() || partitionId != cloneTask.getPartitionId() || indexId != cloneTask.getIndexId() @@ -753,8 +761,6 @@ public class TabletSchedCtx implements Comparable { + replica.getLastFailedVersionHash() + " vs. " + reportedTablet.getVersion_hash()); } - // validate the replica - replica.setState(ReplicaState.NORMAL); replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getVersion_hash(), reportedTablet.getData_size(), reportedTablet.getRow_count()); @@ -771,7 +777,16 @@ public class TabletSchedCtx implements Comparable { replica.getLastFailedVersionHash(), replica.getLastSuccessVersion(), replica.getLastSuccessVersionHash()); - Catalog.getInstance().getEditLog().logAddReplica(info); + + if (replica.getState() == ReplicaState.CLONE) { + replica.setState(ReplicaState.NORMAL); + Catalog.getInstance().getEditLog().logAddReplica(info); + } else { + // if in VERSION_INCOMPLETE, replica is not newly created, thus the state is not CLONE + // so we keep it state unchanged, and log update replica + Catalog.getInstance().getEditLog().logUpdateReplica(info); + } + LOG.info("clone finished: {}", this); } catch (SchedException e) { // if failed to too many times, remove this task diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 3c14bf01f2..94ebd922e6 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -370,6 +370,11 @@ public class TabletScheduler extends Daemon { removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); } continue; + } catch (Exception e) { + LOG.warn("got unexpected exception, discard this schedule. tablet: {}", + tabletCtx.getTabletId(), e); + stat.counterTabletScheduledFailed.incrementAndGet(); + removeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); } Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING); @@ -894,6 +899,12 @@ public class TabletScheduler extends Daemon { removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); return true; } + } catch (Exception e) { + LOG.warn("got unexpected exception when finish clone task. tablet: {}", + tabletCtx.getTabletId(), e); + stat.counterTabletScheduledDiscard.incrementAndGet(); + removeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); + return true; } Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.FINISHED); @@ -1068,8 +1079,14 @@ public class TabletScheduler extends Daemon { /* * If the specified 'pathHash' has available slot, decrease the slot number and return this path hash */ - public synchronized long takeSlot(long pathHash) { - Preconditions.checkArgument(pathHash != -1); + public synchronized long takeSlot(long pathHash) throws SchedException { + if (pathHash == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("path hash is not set.", new Exception()); + } + throw new SchedException(Status.SCHEDULE_FAILED, "path hash is not set"); + } + Slot slot = pathSlots.get(pathHash); if (slot == null) { return -1; diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index f2d05de68b..0a6b560b14 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -722,7 +722,7 @@ public class Config extends ConfigBase { * the default slot number per path in tablet scheduler * TODO(cmy): remove this config and dynamically adjust it by clone task statistic */ - @ConfField public static int schedule_slot_num_per_path = 1; + @ConfField public static int schedule_slot_num_per_path = 2; /* * set to true to use the TabletScheduler instead of the old CloneChecker. diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index d09a153915..b0f0fa8d7e 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -17,27 +17,28 @@ package org.apache.doris.common.proc; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.QueryStatisticsFormatter; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryStatisticsItem; -import java.util.Comparator; -import java.util.List; -import java.util.Map; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; /* * show proc "/current_queries" */ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class); public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("ConnectionId").add("QueryId").add("Database").add("User") + .add("QueryId").add("ConnectionId").add("Database").add("User") .add("ScanBytes").add("ProcessRows").add("ExecTime").build(); private static final int EXEC_TIME_INDEX = 6; @@ -73,8 +74,8 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { = provider.getQueryStatistics(statistic.values()); for (QueryStatisticsItem item : statistic.values()) { final List values = Lists.newArrayList(); - values.add(item.getConnId()); values.add(item.getQueryId()); + values.add(item.getConnId()); values.add(item.getDb()); values.add(item.getUser()); final CurrentQueryInfoProvider.QueryStatistics statistics diff --git a/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index 1c1cb3f0a0..2540f22ac8 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -26,13 +26,10 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.system.Backend; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -43,7 +40,7 @@ import java.util.List; */ public class TabletsProcDir implements ProcDirInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("TabletId").add("ReplicaId").add("BackendId").add("HostName").add("Version") + .add("TabletId").add("ReplicaId").add("BackendId").add("Version") .add("VersionHash").add("LstSuccessVersion").add("LstSuccessVersionHash") .add("LstFailedVersion").add("LstFailedVersionHash").add("LstFailedTime") .add("DataSize").add("RowCount").add("State") @@ -101,19 +98,6 @@ public class TabletsProcDir implements ProcDirInterface { tabletInfo.add(replica.getId()); long backendId = replica.getBackendId(); tabletInfo.add(replica.getBackendId()); - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId); - // backend may be dropped concurrently, ignore it. - if (backend == null) { - continue; - } - String hostName = null; - try { - InetAddress address = InetAddress.getByName(backend.getHost()); - hostName = address.getHostName(); - } catch (UnknownHostException e) { - continue; - } - tabletInfo.add(hostName); tabletInfo.add(replica.getVersion()); tabletInfo.add(replica.getVersionHash()); tabletInfo.add(replica.getLastSuccessVersion()); diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java index 3378241dbc..f4ba638946 100644 --- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -180,10 +180,6 @@ public class JournalEntity implements Writable { needRead = false; break; } - case OperationType.OP_CLEAR_ROLLUP_INFO: { - data = new ReplicaPersistInfo(); - break; - } case OperationType.OP_DROP_ROLLUP: { data = new DropInfo(); break; @@ -248,8 +244,11 @@ public class JournalEntity implements Writable { break; } case OperationType.OP_ADD_REPLICA: - case OperationType.OP_DELETE_REPLICA: { - data = new ReplicaPersistInfo(); + case OperationType.OP_UPDATE_REPLICA: + case OperationType.OP_DELETE_REPLICA: + case OperationType.OP_CLEAR_ROLLUP_INFO: { + data = ReplicaPersistInfo.read(in); + needRead = false; break; } case OperationType.OP_ADD_BACKEND: diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java index 7d47c18331..750b6cab02 100644 --- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java @@ -179,8 +179,6 @@ public class MasterImpl { finishRollup(task, finishTabletInfos); break; case CLONE: - checkHasTabletInfo(request); - finishTabletInfos = request.getFinish_tablet_infos(); finishClone(task, request); break; case CHECK_CONSISTENCY: @@ -341,10 +339,14 @@ public class MasterImpl { int schemaHash = tTabletInfo.getSchema_hash(); // during finishing stage, index's schema hash switched, when old schema hash finished // current index hash != old schema hash and alter job's new schema hash != old schema hash - // the check replcia will failed + // the check replica will failed // should use tabletid not pushTabletid because in rollup state, the push tabletid != tabletid - // and tabletmeta will not contain rollupindex's schema hash + // and tablet meta will not contain rollupindex's schema hash TabletMeta tabletMeta = Catalog.getCurrentInvertedIndex().getTabletMeta(tabletId); + if (tabletMeta == null) { + // rollup may be dropped + throw new MetaNotFoundException("tablet " + tabletId + " does not exist"); + } if (!tabletMeta.containsSchemaHash(schemaHash)) { throw new MetaNotFoundException("tablet[" + tabletId + "] schemaHash is not equal to index's switchSchemaHash. " diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 324e35661d..a5e7a68b34 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -284,7 +284,7 @@ public class ReportHandler extends Daemon { AgentBatchTask batchTask = new AgentBatchTask(); for (AgentTask task : diffTasks) { - // these tasks donot need to do diff + // these tasks no need to do diff // 1. CREATE // 2. SYNC DELETE // 3. CHECK_CONSISTENCY @@ -352,7 +352,7 @@ public class ReportHandler extends Daemon { if (index == null) { continue; } - long schemaHash = olapTable.getSchemaHashByIndexId(indexId); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); Tablet tablet = index.getTablet(tabletId); if (tablet == null) { @@ -400,9 +400,23 @@ public class ReportHandler extends Daemon { continue; } - // happens when PUSH finished in BE but failed or not yet report to FE + // happens when + // 1. PUSH finished in BE but failed or not yet report to FE + // 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE replica.updateVersionInfo(backendVersion, backendVersionHash, dataSize, rowCount); + if (replica.getLastFailedVersion() < 0) { + // last failed version < 0 means this replica becomes health after sync, + // so we write an edit log to sync this operation + ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tableId, + partitionId, indexId, tabletId, backendId, replica.getId(), + replica.getVersion(), replica.getVersionHash(), schemaHash, + dataSize, rowCount, + replica.getLastFailedVersion(), replica.getLastFailedVersionHash(), + replica.getLastSuccessVersion(), replica.getLastSuccessVersionHash()); + Catalog.getInstance().getEditLog().logUpdateReplica(info); + } + ++syncCounter; LOG.debug("sync replica {} of tablet {} in backend {} in db {}.", replica.getId(), tabletId, backendId, dbId); diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index e3ecc59329..8b7227768b 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -396,6 +396,11 @@ public class EditLog { catalog.replayAddReplica(info); break; } + case OperationType.OP_UPDATE_REPLICA: { + ReplicaPersistInfo info = (ReplicaPersistInfo) journal.getData(); + catalog.replayUpdateReplica(info); + break; + } case OperationType.OP_DELETE_REPLICA: { ReplicaPersistInfo info = (ReplicaPersistInfo) journal.getData(); catalog.replayDeleteReplica(info); @@ -913,6 +918,10 @@ public class EditLog { logEdit(OperationType.OP_ADD_REPLICA, info); } + public void logUpdateReplica(ReplicaPersistInfo info) { + logEdit(OperationType.OP_UPDATE_REPLICA, info); + } + public void logDeleteReplica(ReplicaPersistInfo info) { logEdit(OperationType.OP_DELETE_REPLICA, info); } diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java index a0c0244880..eaf8cde0aa 100644 --- a/fe/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java @@ -84,6 +84,7 @@ public class OperationType { public static final short OP_ADD_REPLICA = 42; public static final short OP_DELETE_REPLICA = 43; public static final short OP_FINISH_ASYNC_DELETE = 44; + public static final short OP_UPDATE_REPLICA = 45; public static final short OP_ADD_BACKEND = 50; public static final short OP_DROP_BACKEND = 51; diff --git a/fe/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java b/fe/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java index 897e14e3de..60005124b8 100644 --- a/fe/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java +++ b/fe/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java @@ -190,14 +190,12 @@ public class ReplicaPersistInfo implements Writable { public static ReplicaPersistInfo createForClearRollupInfo(long dbId, long tableId, long partitionId, long indexId) { return new ReplicaPersistInfo(ReplicaOperationType.CLEAR_ROLLUPINFO, - dbId, tableId, partitionId, indexId, - -1L, -1L, -1L, -1L, -1L, -1, - -1L, -1L, -1L, 0L, -1L, 0L); + dbId, tableId, partitionId, indexId, -1L, -1L, -1L, -1L, -1L, -1, -1L, -1L, -1L, 0L, -1L, 0L); } - public ReplicaPersistInfo() { + private ReplicaPersistInfo() { } - + private ReplicaPersistInfo(ReplicaOperationType opType, long dbId, long tableId, long partitionId, long indexId, long tabletId, long backendId, long replicaId, long version, long versionHash, int schemaHash, long dataSize, long rowCount, long lastFailedVersion, long lastFailedVersionHash, diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 55797aea9d..b022debb86 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -1053,8 +1053,8 @@ public class GlobalTransactionMgr { long versionHash = partitionCommitInfo.getVersionHash(); partition.updateVisibleVersionAndVersionHash(version, versionHash); if (LOG.isDebugEnabled()) { - LOG.debug("transaction state {} set partition's version to [{}] and version hash to [{}]", - transactionState, version, versionHash); + LOG.debug("transaction state {} set partition {}'s version to [{}] and version hash to [{}]", + transactionState, partition.getId(), version, versionHash); } } } diff --git a/fe/src/test/java/org/apache/doris/persist/ReplicaPersistInfoTest.java b/fe/src/test/java/org/apache/doris/persist/ReplicaPersistInfoTest.java index 67988aaaf3..85ce8fa289 100644 --- a/fe/src/test/java/org/apache/doris/persist/ReplicaPersistInfoTest.java +++ b/fe/src/test/java/org/apache/doris/persist/ReplicaPersistInfoTest.java @@ -50,8 +50,7 @@ public class ReplicaPersistInfoTest { // 2. Read objects from file DataInputStream dis = new DataInputStream(new FileInputStream(file)); - ReplicaPersistInfo rInfo2 = new ReplicaPersistInfo(); - rInfo2.readFields(dis); + ReplicaPersistInfo rInfo2 = ReplicaPersistInfo.read(dis); // 3. delete files dis.close(); diff --git a/gensrc/script/gen_build_version.sh b/gensrc/script/gen_build_version.sh index 82cf00e22e..7fecc1dfd9 100755 --- a/gensrc/script/gen_build_version.sh +++ b/gensrc/script/gen_build_version.sh @@ -25,7 +25,7 @@ # contains the build version based on the git hash or svn revision. ############################################################## -build_version="3.3-branch" +build_version="0.9-branch" unset LANG unset LC_CTYPE