diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index a970d1798d..3a1905bd8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -282,6 +282,14 @@ public class Partition extends MetaObject implements Writable { return replicaCount; } + public long getAllReplicaCount() { + long replicaCount = 0; + for (MaterializedIndex mIndex : getMaterializedIndices(IndexExtState.ALL)) { + replicaCount += mIndex.getReplicaCount(); + } + return replicaCount; + } + public boolean hasData() { // The fe unit test need to check the selected index id without any data. // So if set FeConstants.runningUnitTest, we can ensure that the number of partitions is not empty, diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index dbf303f10f..ea3f786d0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -17,78 +17,26 @@ package org.apache.doris.load; -import org.apache.doris.analysis.BinaryPredicate; -import org.apache.doris.analysis.DateLiteral; import org.apache.doris.analysis.DeleteStmt; -import org.apache.doris.analysis.InPredicate; -import org.apache.doris.analysis.IsNullPredicate; -import org.apache.doris.analysis.LiteralExpr; -import org.apache.doris.analysis.Predicate; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.KeysType; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.MaterializedIndex.IndexExtState; -import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.PartitionItem; -import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.catalog.TabletInvertedIndex; -import org.apache.doris.catalog.Type; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.MarkedCountDownLatch; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.DeleteJob.DeleteState; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.planner.ColumnBound; -import org.apache.doris.planner.ColumnRange; -import org.apache.doris.planner.ListPartitionPrunerV2; -import org.apache.doris.planner.PartitionPruner; -import org.apache.doris.planner.RangePartitionPrunerV2; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.QueryState.MysqlStateType; -import org.apache.doris.qe.QueryStateException; -import org.apache.doris.service.FrontendOptions; -import org.apache.doris.task.AgentBatchTask; -import org.apache.doris.task.AgentTaskExecutor; -import org.apache.doris.task.AgentTaskQueue; -import org.apache.doris.task.PushTask; -import org.apache.doris.thrift.TColumn; -import org.apache.doris.thrift.TPriority; -import org.apache.doris.thrift.TPushType; -import org.apache.doris.thrift.TTaskType; -import org.apache.doris.transaction.GlobalTransactionMgr; -import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.qe.QueryState; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.TxnCoordinator; -import org.apache.doris.transaction.TransactionState.TxnSourceType; -import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Range; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -97,18 +45,12 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; public class DeleteHandler implements Writable { private static final Logger LOG = LogManager.getLogger(DeleteHandler.class); @@ -128,13 +70,6 @@ public class DeleteHandler implements Writable { lock = new ReentrantReadWriteLock(); } - private enum CancelType { - METADATA_MISSING, - TIMEOUT, - COMMIT_FAIL, - UNKNOWN - } - public void readLock() { lock.readLock().lock(); } @@ -151,258 +86,44 @@ public class DeleteHandler implements Writable { lock.writeLock().unlock(); } - public void process(DeleteStmt stmt) throws DdlException, QueryStateException { - String dbName = stmt.getDbName(); - String tableName = stmt.getTableName(); - List partitionNames = stmt.getPartitionNames(); - boolean noPartitionSpecified = partitionNames.isEmpty(); - List conditions = stmt.getDeleteConditions(); - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); - + public void process(DeleteStmt stmt, QueryState execState) throws DdlException { + Database targetDb = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName()); + OlapTable targetTbl = targetDb.getOlapTableOrDdlException(stmt.getTableName()); DeleteJob deleteJob = null; try { - MarkedCountDownLatch countDownLatch; - long transactionId = -1; - OlapTable olapTable = db.getOlapTableOrDdlException(tableName); - olapTable.readLock(); + targetTbl.readLock(); try { - if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) { + if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) { // table under alter operation can also do delete. // just add a comment here to notice. } + deleteJob = DeleteJob.newBuilder() + .buildWith(new DeleteJob.BuildParams( + targetDb, + targetTbl, + stmt.getPartitionNames(), + stmt.getDeleteConditions())); - if (noPartitionSpecified) { - // Try to get selected partitions if no partition specified in delete statement - // Use PartitionPruner to generate the select partitions - if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE - || olapTable.getPartitionInfo().getType() == PartitionType.LIST) { - Set partitionColumnNameSet = olapTable.getPartitionColumnNames(); - Map columnNameToRange = Maps.newHashMap(); - for (String colName : partitionColumnNameSet) { - ColumnRange columnRange = createColumnRange(olapTable, colName, conditions); - // Not all partition columns are involved in predicate conditions - if (columnRange != null) { - columnNameToRange.put(colName, columnRange); - } - } - - Collection selectedPartitionId = null; - if (!columnNameToRange.isEmpty()) { - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - Map keyItemMap = partitionInfo.getIdToItem(false); - PartitionPruner pruner = olapTable.getPartitionInfo().getType() == PartitionType.RANGE - ? new RangePartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(), - columnNameToRange) - : new ListPartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(), - columnNameToRange); - selectedPartitionId = pruner.prune(); - } - // selectedPartitionId is empty means no partition matches conditions. - // How to return empty set in such case? - if (selectedPartitionId != null && !selectedPartitionId.isEmpty()) { - for (long partitionId : selectedPartitionId) { - partitionNames.add(olapTable.getPartition(partitionId).getName()); - } - } else { - if (!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) { - throw new DdlException("This is a range or list partitioned table." - + " You should specify partition in delete stmt," - + " or set delete_without_partition to true"); - } else { - partitionNames.addAll(olapTable.getPartitionNames()); - } - } - } else if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) { - // this is a unpartitioned table, use table name as partition name - partitionNames.add(olapTable.getName()); - } else { - throw new DdlException("Unknown partition type: " + olapTable.getPartitionInfo().getType()); - } - } - - Map partitionReplicaNum = Maps.newHashMap(); - List partitions = Lists.newArrayList(); - for (String partName : partitionNames) { - Partition partition = olapTable.getPartition(partName); - if (partition == null) { - throw new DdlException("Partition does not exist. name: " + partName); - } - partitions.add(partition); - partitionReplicaNum.put(partition.getId(), - olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()); - } - - List deleteConditions = Lists.newArrayList(); - - // pre check - checkDeleteV2(olapTable, partitions, conditions, deleteConditions); - - // generate label - String label = "delete_" + UUID.randomUUID(); - //generate jobId - long jobId = Env.getCurrentEnv().getNextId(); - // begin txn here and generate txn id - transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), - Lists.newArrayList(olapTable.getId()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - TransactionState.LoadJobSourceType.FRONTEND, jobId, Config.stream_load_default_timeout_second); - - - DeleteInfo deleteInfo = new DeleteInfo(db.getId(), olapTable.getId(), tableName, deleteConditions); - deleteInfo.setPartitions(noPartitionSpecified, partitions.stream().map(Partition::getId) - .collect(Collectors.toList()), partitionNames); - deleteJob = new DeleteJob(jobId, transactionId, label, partitionReplicaNum, deleteInfo); - idToDeleteJob.put(deleteJob.getTransactionId(), deleteJob); - - Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(deleteJob); + long txnId = deleteJob.beginTxn(); TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(db.getId(), transactionId); + .getTransactionState(targetDb.getId(), txnId); // must call this to make sure we only handle the tablet in the mIndex we saw here. - // table may be under schema changge or rollup, and the newly created tablets will not be checked later, + // table may be under schema change or rollup, and the newly created tablets will not be checked later, // to make sure that the delete transaction can be done successfully. - txnState.addTableIndexes(olapTable); - - // task sent to be - AgentBatchTask batchTask = new AgentBatchTask(); - // count total replica num - // Get ALL materialized indexes, because delete condition will be applied to all indexes - int totalReplicaNum = 0; - for (Partition partition : partitions) { - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { - for (Tablet tablet : index.getTablets()) { - totalReplicaNum += tablet.getReplicas().size(); - } - } - } - countDownLatch = new MarkedCountDownLatch(totalReplicaNum); - - for (Partition partition : partitions) { - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { - long indexId = index.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - - List columnsDesc = new ArrayList(); - for (Column column : olapTable.getSchemaByIndexId(indexId)) { - columnsDesc.add(column.toThrift()); - } - - for (Tablet tablet : index.getTablets()) { - long tabletId = tablet.getId(); - - // set push type - TPushType type = TPushType.DELETE; - - for (Replica replica : tablet.getReplicas()) { - long replicaId = replica.getId(); - long backendId = replica.getBackendId(); - countDownLatch.addMark(backendId, tabletId); - - // create push task for each replica - PushTask pushTask = new PushTask(null, - replica.getBackendId(), db.getId(), olapTable.getId(), - partition.getId(), indexId, - tabletId, replicaId, schemaHash, - -1, "", -1, 0, - -1, type, conditions, - true, TPriority.NORMAL, - TTaskType.REALTIME_PUSH, - transactionId, - Env.getCurrentGlobalTransactionMgr() - .getTransactionIDGenerator().getNextTransactionId(), - columnsDesc); - pushTask.setIsSchemaChanging(false); - pushTask.setCountDownLatch(countDownLatch); - - if (AgentTaskQueue.addTask(pushTask)) { - batchTask.addTask(pushTask); - deleteJob.addPushTask(pushTask); - deleteJob.addTablet(tabletId); - } - } - } - } - } - - // submit push tasks - if (batchTask.getTaskNum() > 0) { - AgentTaskExecutor.submit(batchTask); - } - - } catch (Throwable t) { - LOG.warn("error occurred during delete process", t); - // if transaction has been begun, need to abort it - if (Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), transactionId) != null) { - cancelJob(deleteJob, CancelType.UNKNOWN, t.getMessage()); - } - throw new DdlException(t.getMessage(), t); + txnState.addTableIndexes(targetTbl); + idToDeleteJob.put(txnId, deleteJob); + deleteJob.dispatch(); } finally { - olapTable.readUnlock(); + targetTbl.readUnlock(); } - - long timeoutMs = deleteJob.getTimeoutMs(); - LOG.info("waiting delete Job finish, signature: {}, timeout: {}", transactionId, timeoutMs); - boolean ok = false; - try { - ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.warn("InterruptedException: ", e); - ok = false; - } - - if (!ok) { - String errMsg = ""; - List> unfinishedMarks = countDownLatch.getLeftMarks(); - // only show at most 5 results - List> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5)); - if (!subList.isEmpty()) { - errMsg = "unfinished replicas [BackendId=TabletId]: " + Joiner.on(", ").join(subList); - } - LOG.warn(errMsg); - - try { - deleteJob.checkAndUpdateQuorum(); - } catch (MetaNotFoundException e) { - cancelJob(deleteJob, CancelType.METADATA_MISSING, e.getMessage()); - throw new DdlException(e.getMessage(), e); - } - DeleteState state = deleteJob.getState(); - switch (state) { - case UN_QUORUM: - LOG.warn("delete job timeout: transactionId {}, timeout {}, {}", - transactionId, timeoutMs, errMsg); - cancelJob(deleteJob, CancelType.TIMEOUT, "delete job timeout"); - throw new DdlException("failed to execute delete. transaction id " + transactionId - + ", timeout(ms) " + timeoutMs + ", " + errMsg); - case QUORUM_FINISHED: - case FINISHED: - try { - long nowQuorumTimeMs = System.currentTimeMillis(); - long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2; - // if job's state is quorum_finished then wait for a period of time and commit it. - while (deleteJob.getState() == DeleteState.QUORUM_FINISHED - && endQuorumTimeoutMs > nowQuorumTimeMs) { - deleteJob.checkAndUpdateQuorum(); - Thread.sleep(1000); - nowQuorumTimeMs = System.currentTimeMillis(); - LOG.debug("wait for quorum finished delete job: {}, txn id: {}", - deleteJob.getId(), transactionId); - } - } catch (MetaNotFoundException e) { - cancelJob(deleteJob, CancelType.METADATA_MISSING, e.getMessage()); - throw new DdlException(e.getMessage(), e); - } catch (InterruptedException e) { - cancelJob(deleteJob, CancelType.UNKNOWN, e.getMessage()); - throw new DdlException(e.getMessage(), e); - } - commitJob(deleteJob, db, olapTable, timeoutMs); - break; - default: - Preconditions.checkState(false, "wrong delete job state: " + state.name()); - break; - } - } else { - commitJob(deleteJob, db, olapTable, timeoutMs); + deleteJob.await(); + String commitMsg = deleteJob.commit(); + execState.setOk(0, 0, commitMsg); + } catch (Exception ex) { + if (deleteJob != null) { + deleteJob.cancel(ex.getMessage()); } + execState.setError(ex.getMessage()); } finally { if (!FeConstants.runningUnitTest) { clearJob(deleteJob); @@ -410,176 +131,21 @@ public class DeleteHandler implements Writable { } } - // Return null if there is no filter for the partition column - private ColumnRange createColumnRange(OlapTable table, String colName, List conditions) - throws AnalysisException { - ColumnRange result = ColumnRange.create(); - Type type = - table.getBaseSchema().stream().filter(c -> c.getName().equalsIgnoreCase(colName)) - .findFirst().get().getType(); - - boolean hasRange = false; - for (Predicate predicate : conditions) { - List> bounds = createColumnRange(colName, predicate, type); - if (bounds != null) { - hasRange = true; - result.intersect(bounds); - } - } - if (hasRange) { - return result; - } else { - return null; - } - } - - // Return null if the condition is not related to the partition column, - // or the operator is not supported. - private List> createColumnRange(String colName, Predicate condition, Type type) - throws AnalysisException { - List> result = Lists.newLinkedList(); - if (condition instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) condition; - if (!(binaryPredicate.getChild(0) instanceof SlotRef)) { - return null; - } - String columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName(); - if (!colName.equalsIgnoreCase(columnName)) { - return null; - } - ColumnBound bound = ColumnBound.of( - LiteralExpr.create(((LiteralExpr) binaryPredicate.getChild(1)).getStringValue(), type)); - switch (binaryPredicate.getOp()) { - case EQ: - result.add(Range.closed(bound, bound)); - break; - case GE: - result.add(Range.atLeast(bound)); - break; - case GT: - result.add(Range.greaterThan(bound)); - break; - case LT: - result.add(Range.lessThan(bound)); - break; - case LE: - result.add(Range.atMost(bound)); - break; - case NE: - result.add(Range.lessThan(bound)); - result.add(Range.greaterThan(bound)); - break; - default: - return null; - } - } else if (condition instanceof InPredicate) { - InPredicate inPredicate = (InPredicate) condition; - if (!(inPredicate.getChild(0) instanceof SlotRef)) { - return null; - } - String columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName(); - if (!colName.equals(columnName)) { - return null; - } - if (inPredicate.isNotIn()) { - return null; - } - for (int i = 1; i <= inPredicate.getInElementNum(); i++) { - ColumnBound bound = ColumnBound.of(LiteralExpr - .create(((LiteralExpr) inPredicate.getChild(i)).getStringValue(), type)); - result.add(Range.closed(bound, bound)); - } - } else { - return null; - } - return result; - } - - private void commitJob(DeleteJob job, Database db, Table table, long timeoutMs) - throws DdlException, QueryStateException { - TransactionStatus status = TransactionStatus.UNKNOWN; - try { - boolean isVisible = unprotectedCommitJob(job, db, table, timeoutMs); - status = isVisible ? TransactionStatus.VISIBLE : TransactionStatus.COMMITTED; - } catch (UserException e) { - if (cancelJob(job, CancelType.COMMIT_FAIL, e.getMessage())) { - throw new DdlException(e.getMessage(), e); - } - } - - StringBuilder sb = new StringBuilder(); - sb.append("{'label':'").append(job.getLabel()).append("', 'status':'").append(status.name()); - sb.append("', 'txnId':'").append(job.getTransactionId()).append("'"); - - switch (status) { - case COMMITTED: { - // Although publish is unfinished we should tell user that commit already success. - String errMsg = "delete job is committed but may be taking effect later"; - sb.append(", 'err':'").append(errMsg).append("'"); - sb.append("}"); - throw new QueryStateException(MysqlStateType.OK, sb.toString()); - } - case VISIBLE: { - sb.append("}"); - throw new QueryStateException(MysqlStateType.OK, sb.toString()); - } - default: - Preconditions.checkState(false, "wrong transaction status: " + status.name()); - break; - } - } - - /** - * unprotected commit delete job - * return true when successfully commit and publish - * return false when successfully commit but publish unfinished. - * A UserException thrown if both commit and publish failed. - * @param job - * @param db - * @param timeoutMs - * @return - * @throws UserException - */ - private boolean unprotectedCommitJob(DeleteJob job, Database db, Table table, long timeoutMs) throws UserException { - long transactionId = job.getTransactionId(); - GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); - List tabletCommitInfos = new ArrayList(); - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (TabletDeleteInfo tDeleteInfo : job.getTabletDeleteInfo()) { - for (Replica replica : tDeleteInfo.getFinishedReplicas()) { - // the inverted index contains rolling up replica - Long tabletId = invertedIndex.getTabletIdByReplica(replica.getId()); - if (tabletId == null) { - LOG.warn("could not find tablet id for replica {}, the tablet maybe dropped", replica); - continue; - } - tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendId())); - } - } - return globalTransactionMgr.commitAndPublishTransaction(db, Lists.newArrayList(table), - transactionId, tabletCommitInfos, timeoutMs); - } - /** * This method should always be called in the end of the delete process to clean the job. * Better put it in finally block. + * * @param job */ private void clearJob(DeleteJob job) { - if (job != null) { - long signature = job.getTransactionId(); - if (idToDeleteJob.containsKey(signature)) { - idToDeleteJob.remove(signature); - } - for (PushTask pushTask : job.getPushTasks()) { - AgentTaskQueue.removePushTask(pushTask.getBackendId(), pushTask.getSignature(), - pushTask.getVersion(), - pushTask.getPushType(), pushTask.getTaskType()); - } - - // NOT remove callback from GlobalTransactionMgr's callback factory here. - // the callback will be removed after transaction is aborted of visible. + if (job == null) { + return; } + long signature = job.getTransactionId(); + idToDeleteJob.remove(signature); + job.cleanUp(); + // do not remove callback from GlobalTransactionMgr's callback factory here. + // the callback will be removed after transaction is aborted or visible. } public void recordFinishedJob(DeleteJob job) { @@ -598,250 +164,13 @@ public class DeleteHandler implements Writable { } } - /** - * abort delete job - * return true when successfully abort. - * return true when some unknown error happened, just ignore it. - * return false when the job is already committed - * @param job - * @param cancelType - * @param reason - * @return - */ - public boolean cancelJob(DeleteJob job, CancelType cancelType, String reason) { - LOG.info("start to cancel delete job, transactionId: {}, cancelType: {}", - job.getTransactionId(), cancelType.name()); - GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); - try { - if (job != null) { - globalTransactionMgr.abortTransaction(job.getDeleteInfo().getDbId(), job.getTransactionId(), reason); - } - } catch (Exception e) { - TransactionState state = globalTransactionMgr.getTransactionState( - job.getDeleteInfo().getDbId(), job.getTransactionId()); - if (state == null) { - LOG.warn("cancel delete job failed because txn not found, transactionId: {}", - job.getTransactionId()); - } else if (state.getTransactionStatus() == TransactionStatus.COMMITTED - || state.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.warn("cancel delete job failed because it has been committed, transactionId: {}", - job.getTransactionId()); - return false; - } else { - LOG.warn("errors while abort transaction", e); - } - } - return true; - } - public DeleteJob getDeleteJob(long transactionId) { return idToDeleteJob.get(transactionId); } - private SlotRef getSlotRef(Predicate condition) { - SlotRef slotRef = null; - if (condition instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) condition; - slotRef = (SlotRef) binaryPredicate.getChild(0); - } else if (condition instanceof IsNullPredicate) { - IsNullPredicate isNullPredicate = (IsNullPredicate) condition; - slotRef = (SlotRef) isNullPredicate.getChild(0); - } else if (condition instanceof InPredicate) { - InPredicate inPredicate = (InPredicate) condition; - slotRef = (SlotRef) inPredicate.getChild(0); - } - return slotRef; - } - - private void checkDeleteV2(OlapTable table, List partitions, - List conditions, List deleteConditions) - throws DdlException { - // check condition column is key column and condition value - // Here we use "getFullSchema()" to get all columns including VISIBLE and SHADOW columns - Map nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (Column column : table.getFullSchema()) { - nameToColumn.put(column.getName(), column); - } - - for (Predicate condition : conditions) { - SlotRef slotRef = getSlotRef(condition); - String columnName = slotRef.getColumnName(); - if (!nameToColumn.containsKey(columnName)) { - ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName()); - } - - if (Column.isShadowColumn(columnName)) { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, - "Can not apply delete condition to shadow column"); - } - - // Check if this column is under schema change, if yes, there will be a shadow column related to it. - // And we don't allow doing delete operation when a condition column is under schema change. - String shadowColName = Column.getShadowName(columnName); - if (nameToColumn.containsKey(shadowColName)) { - ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Column " + columnName + " is under" - + " schema change operation. Do not allow delete operation"); - } - - Column column = nameToColumn.get(columnName); - // Due to rounding errors, most floating-point numbers end up being slightly imprecise, - // it also means that numbers expected to be equal often differ slightly, so we do not allow compare with - // floating-point numbers, floating-point number not allowed in where clause - if (column.getDataType().isFloatingPointType()) { - throw new DdlException("Column[" + columnName + "] type is float or double."); - } - if (!column.isKey()) { - if (table.getKeysType() == KeysType.AGG_KEYS) { - throw new DdlException("delete predicate on value column only supports Unique table with" - + " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName() - + "] is an Aggregate table."); - } else if (table.getKeysType() == KeysType.UNIQUE_KEYS && !table.getEnableUniqueKeyMergeOnWrite()) { - throw new DdlException("delete predicate on value column only supports Unique table with" - + " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName() - + "] is an Aggregate table."); - } - } - - if (condition instanceof BinaryPredicate) { - String value = null; - try { - BinaryPredicate binaryPredicate = (BinaryPredicate) condition; - // if a bool cond passed to be, be's zone_map cannot handle bool correctly, - // change it to a tinyint type here; - value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue(); - if (column.getDataType() == PrimitiveType.BOOLEAN) { - if (value.toLowerCase().equals("true")) { - binaryPredicate.setChild(1, LiteralExpr.create("1", Type.TINYINT)); - } else if (value.toLowerCase().equals("false")) { - binaryPredicate.setChild(1, LiteralExpr.create("0", Type.TINYINT)); - } - } else if (column.getDataType() == PrimitiveType.DATE - || column.getDataType() == PrimitiveType.DATETIME - || column.getDataType() == PrimitiveType.DATEV2) { - DateLiteral dateLiteral = new DateLiteral(value, Type.fromPrimitiveType(column.getDataType())); - value = dateLiteral.getStringValue(); - binaryPredicate.setChild(1, LiteralExpr.create(value, - Type.fromPrimitiveType(column.getDataType()))); - } else if (column.getDataType() == PrimitiveType.DATETIMEV2) { - DateLiteral dateLiteral = new DateLiteral(value, - ScalarType.createDatetimeV2Type(ScalarType.MAX_DATETIMEV2_SCALE)); - value = dateLiteral.getStringValue(); - binaryPredicate.setChild(1, LiteralExpr.create(value, - ScalarType.createDatetimeV2Type(ScalarType.MAX_DATETIMEV2_SCALE))); - } - LiteralExpr.create(value, column.getType()); - } catch (AnalysisException e) { - // ErrorReport.reportDdlException(ErrorCode.ERR_INVALID_VALUE, value); - throw new DdlException("Invalid column value[" + value + "] for column " + columnName); - } - } else if (condition instanceof InPredicate) { - String value = null; - try { - InPredicate inPredicate = (InPredicate) condition; - for (int i = 1; i <= inPredicate.getInElementNum(); i++) { - value = inPredicate.getChild(i).getStringValue(); - if (column.getDataType() == PrimitiveType.DATE - || column.getDataType() == PrimitiveType.DATETIME - || column.getDataType() == PrimitiveType.DATEV2 - || column.getDataType() == PrimitiveType.DATETIMEV2) { - DateLiteral dateLiteral = new DateLiteral(value, - column.getType()); - value = dateLiteral.getStringValue(); - inPredicate.setChild(i, LiteralExpr.create(value, - column.getType())); - } else { - LiteralExpr.create(value, - Type.fromPrimitiveType(column.getDataType())); - } - } - } catch (AnalysisException e) { - throw new DdlException("Invalid column value[" + value + "] for column " + columnName); - } - } - - // set schema column name - slotRef.setCol(column.getName()); - } - - // check materialized index. - // only need to check the first partition, because each partition has same materialized views - Map> indexIdToSchema = table.getIndexIdToSchema(); - Partition partition = partitions.get(0); - // Here we check ALL materialized views instead of just VISIBLE ones. - // For example, when a table is doing rollup or schema change. there will be some SHADOW indexes. - // And we also need to check these SHADOW indexes to see if the delete condition can be applied to them. - for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { - if (table.getBaseIndexId() == index.getId()) { - continue; - } - - // check table has condition column - Map indexColNameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (Column column : indexIdToSchema.get(index.getId())) { - indexColNameToColumn.put(column.getName(), column); - } - String indexName = table.getIndexNameById(index.getId()); - for (Predicate condition : conditions) { - SlotRef slotRef = getSlotRef(condition); - String columnName = slotRef.getColumnName(); - Column column = indexColNameToColumn.get(columnName); - if (column == null) { - ErrorReport.reportDdlException(ErrorCode.ERR_BAD_FIELD_ERROR, - columnName, "index[" + indexName + "]"); - } - MaterializedIndexMeta indexMeta = table.getIndexIdToMeta().get(index.getId()); - if (indexMeta.getKeysType() != KeysType.DUP_KEYS && !column.isKey()) { - throw new DdlException("Column[" + columnName + "] is not key column in index[" + indexName + "]"); - } - } - } - - if (deleteConditions == null) { - return; - } - - // save delete conditions - for (Predicate condition : conditions) { - if (condition instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) condition; - SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0); - String columnName = slotRef.getColumnName(); - StringBuilder sb = new StringBuilder(); - sb.append(columnName).append(" ").append(binaryPredicate.getOp().name()).append(" \"") - .append(binaryPredicate.getChild(1).getStringValue()).append("\""); - deleteConditions.add(sb.toString()); - } else if (condition instanceof IsNullPredicate) { - IsNullPredicate isNullPredicate = (IsNullPredicate) condition; - SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0); - String columnName = slotRef.getColumnName(); - StringBuilder sb = new StringBuilder(); - sb.append(columnName); - if (isNullPredicate.isNotNull()) { - sb.append(" IS NOT NULL"); - } else { - sb.append(" IS NULL"); - } - deleteConditions.add(sb.toString()); - } else if (condition instanceof InPredicate) { - InPredicate inPredicate = (InPredicate) condition; - SlotRef slotRef = (SlotRef) inPredicate.getChild(0); - String columnName = slotRef.getColumnName(); - StringBuilder strBuilder = new StringBuilder(); - String notStr = inPredicate.isNotIn() ? "NOT " : ""; - strBuilder.append(columnName).append(" ").append(notStr).append("IN ("); - for (int i = 1; i <= inPredicate.getInElementNum(); ++i) { - strBuilder.append(inPredicate.getChild(i).toSql()); - strBuilder.append((i != inPredicate.getInElementNum()) ? ", " : ""); - } - strBuilder.append(")"); - deleteConditions.add(strBuilder.toString()); - } - } - } - // show delete stmt public List> getDeleteInfosByDb(long dbId) { - LinkedList> infos = new LinkedList>(); + LinkedList> infos = new LinkedList<>(); Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { return infos; @@ -896,8 +225,8 @@ public class DeleteHandler implements Writable { } // sort by createTimeMs - ListComparator> comparator = new ListComparator>(2); - Collections.sort(infos, comparator); + ListComparator> comparator = new ListComparator<>(2); + infos.sort(comparator); return infos; } @@ -919,7 +248,6 @@ public class DeleteHandler implements Writable { @Override public void write(DataOutput out) throws IOException { removeOldDeleteInfos(); - Text.writeString(out, GsonUtils.GSON.toJson(this)); } public static DeleteHandler read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java index 300c67e58d..05c6c4b1a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteInfo.java @@ -60,10 +60,6 @@ public class DeleteInfo implements Writable, GsonPostProcessable { @SerializedName(value = "partitionName") private String partitionName; - public DeleteInfo() { - this.deleteConditions = Lists.newArrayList(); - } - public DeleteInfo(long dbId, long tableId, String tableName, List deleteConditions) { this.dbId = dbId; this.tableId = tableId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 3723c226c5..eaa4395093 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -17,29 +17,74 @@ package org.apache.doris.load; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IsNullPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.PartitionPruner; +import org.apache.doris.planner.RangePartitionPrunerV2; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PushTask; +import org.apache.doris.thrift.TColumn; +import org.apache.doris.thrift.TPriority; +import org.apache.doris.thrift.TPushType; +import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; +import org.apache.doris.transaction.GlobalTransactionMgr; +import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -public class DeleteJob extends AbstractTxnStateChangeCallback { +public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJobLifeCycle { private static final Logger LOG = LogManager.getLogger(DeleteJob.class); + public static final String DELETE_PREFIX = "delete_"; + public enum DeleteState { UN_QUORUM, QUORUM_FINISHED, @@ -49,18 +94,28 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { private DeleteState state; // jobId(listenerId). use in beginTransaction to callback function - private long id; + private final long id; // transaction id. private long signature; - private String label; - private Set totalTablets; - private Set quorumTablets; - private Set finishedTablets; + private final String label; + private final Set totalTablets; + private final Set quorumTablets; + private final Set finishedTablets; Map tabletDeleteInfoMap; - private Set pushTasks; - private DeleteInfo deleteInfo; + private final Set pushTasks; + private final DeleteInfo deleteInfo; - private Map partitionReplicaNum; + private final Map partitionReplicaNum; + + private Database targetDb; + + private OlapTable targetTbl; + + private List partitions; + + private List deleteConditions; + + private MarkedCountDownLatch countDownLatch; public DeleteJob(long id, long transactionId, String label, Map partitionReplicaNum, DeleteInfo deleteInfo) { @@ -77,13 +132,17 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { this.partitionReplicaNum = partitionReplicaNum; } + public static Builder newBuilder() { + return new Builder(); + } + /** * check and update if this job's state is QUORUM_FINISHED or FINISHED * The meaning of state: * QUORUM_FINISHED: For each tablet there are more than half of its replicas have been finished - * FINISHED: All replicas of this jobs have finished + * FINISHED: All replicas of this job have finished */ - public void checkAndUpdateQuorum() throws MetaNotFoundException { + private void checkAndUpdateQuorum() throws MetaNotFoundException { long dbId = deleteInfo.getDbId(); Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); @@ -122,32 +181,28 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { signature, totalTablets.size(), quorumTablets.size(), dropCounter); if (finishedTablets.containsAll(totalTablets)) { - setState(DeleteState.FINISHED); + this.state = DeleteState.FINISHED; } else if (quorumTablets.containsAll(totalTablets)) { - setState(DeleteState.QUORUM_FINISHED); + this.state = DeleteState.QUORUM_FINISHED; } } - public void setState(DeleteState state) { - this.state = state; - } - public DeleteState getState() { return this.state; } - public boolean addTablet(long tabletId) { - return totalTablets.add(tabletId); + private void addTablet(long tabletId) { + totalTablets.add(tabletId); } - public boolean addPushTask(PushTask pushTask) { - return pushTasks.add(pushTask); + public void addPushTask(PushTask pushTask) { + pushTasks.add(pushTask); } - public boolean addFinishedReplica(long partitionId, long tabletId, Replica replica) { + public void addFinishedReplica(long partitionId, long tabletId, Replica replica) { tabletDeleteInfoMap.putIfAbsent(tabletId, new TabletDeleteInfo(partitionId, tabletId)); - TabletDeleteInfo tDeleteInfo = tabletDeleteInfoMap.get(tabletId); - return tDeleteInfo.addFinishedReplica(replica); + TabletDeleteInfo tDeleteInfo = tabletDeleteInfoMap.get(tabletId); + tDeleteInfo.addFinishedReplica(replica); } public DeleteInfo getDeleteInfo() { @@ -158,10 +213,6 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { return this.label; } - public Set getPushTasks() { - return pushTasks; - } - @Override public long getId() { return this.id; @@ -177,14 +228,13 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { } @Override - public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) - throws UserException { + public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) { // just to clean the callback Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId()); } public void executeFinish() { - setState(DeleteState.FINISHED); + this.state = DeleteState.FINISHED; Env.getCurrentEnv().getDeleteHandler().recordFinishedJob(this); Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId()); } @@ -206,4 +256,451 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { long timeout = Math.max(totalTablets.size() * Config.tablet_delete_timeout_second * 1000L, 30000L); return Math.min(timeout, Config.delete_job_max_timeout_second * 1000L); } + + public void setTargetDb(Database targetDb) { + this.targetDb = targetDb; + } + + public void setTargetTbl(OlapTable targetTbl) { + this.targetTbl = targetTbl; + } + + public void setPartitions(List partitions) { + this.partitions = partitions; + } + + public void setDeleteConditions(List deleteConditions) { + this.deleteConditions = deleteConditions; + } + + public void setCountDownLatch(MarkedCountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public long beginTxn() throws Exception { + long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(), + Lists.newArrayList(deleteInfo.getTableId()), label, null, + new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + TransactionState.LoadJobSourceType.FRONTEND, id, Config.stream_load_default_timeout_second); + this.signature = txnId; + Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this); + return txnId; + } + + @Override + public void dispatch() throws Exception { + // task sent to be + AgentBatchTask batchTask = new AgentBatchTask(); + for (Partition partition : partitions) { + for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + long indexId = index.getId(); + int schemaHash = targetTbl.getSchemaHashByIndexId(indexId); + + List columnsDesc = Lists.newArrayList(); + for (Column column : targetTbl.getSchemaByIndexId(indexId)) { + columnsDesc.add(column.toThrift()); + } + + for (Tablet tablet : index.getTablets()) { + long tabletId = tablet.getId(); + + // set push type + TPushType type = TPushType.DELETE; + + for (Replica replica : tablet.getReplicas()) { + long replicaId = replica.getId(); + long backendId = replica.getBackendId(); + countDownLatch.addMark(backendId, tabletId); + + // create push task for each replica + PushTask pushTask = new PushTask(null, + replica.getBackendId(), targetDb.getId(), targetTbl.getId(), + partition.getId(), indexId, + tabletId, replicaId, schemaHash, + -1, "", -1, 0, + -1, type, deleteConditions, + true, TPriority.NORMAL, + TTaskType.REALTIME_PUSH, + signature, + Env.getCurrentGlobalTransactionMgr() + .getTransactionIDGenerator().getNextTransactionId(), + columnsDesc); + pushTask.setIsSchemaChanging(false); + pushTask.setCountDownLatch(countDownLatch); + + if (AgentTaskQueue.addTask(pushTask)) { + batchTask.addTask(pushTask); + addPushTask(pushTask); + addTablet(tabletId); + } + } + } + } + } + + // submit push tasks + if (batchTask.getTaskNum() > 0) { + AgentTaskExecutor.submit(batchTask); + } + } + + @Override + public void await() throws Exception { + long timeoutMs = getTimeoutMs(); + boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + if (ok) { + return; + } + + //handle failure + String errMsg = ""; + List> unfinishedMarks = countDownLatch.getLeftMarks(); + // only show at most 5 results + List> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5)); + if (!subList.isEmpty()) { + errMsg = "unfinished replicas [BackendId=TabletId]: " + Joiner.on(", ").join(subList); + } + LOG.warn(errMsg); + checkAndUpdateQuorum(); + switch (state) { + case UN_QUORUM: + LOG.warn("delete job timeout: transactionId {}, timeout {}, {}", + signature, timeoutMs, errMsg); + throw new UserException(String.format("delete job timeout, timeout(ms):%s, msg:%s", timeoutMs, errMsg)); + case QUORUM_FINISHED: + case FINISHED: + long nowQuorumTimeMs = System.currentTimeMillis(); + long endQuorumTimeoutMs = nowQuorumTimeMs + timeoutMs / 2; + // if job's state is quorum_finished then wait for a period of time and commit it. + while (state == DeleteState.QUORUM_FINISHED + && endQuorumTimeoutMs > nowQuorumTimeMs) { + checkAndUpdateQuorum(); + Thread.sleep(1000); + nowQuorumTimeMs = System.currentTimeMillis(); + LOG.debug("wait for quorum finished delete job: {}, txn id: {}", + id, signature); + } + break; + default: + throw new IllegalStateException("wrong delete job state: " + state.name()); + } + } + + @Override + public String commit() throws Exception { + TabletInvertedIndex currentInvertedIndex = Env.getCurrentInvertedIndex(); + List tabletCommitInfos = Lists.newArrayList(); + tabletDeleteInfoMap.forEach((tabletId, deleteInfo) -> deleteInfo.getFinishedReplicas() + .forEach(replica -> { + if (currentInvertedIndex.getTabletIdByReplica(replica.getId()) == null) { + LOG.warn("could not find tablet id for replica {}, the tablet maybe dropped", replica); + return; + } + tabletCommitInfos.add(new TabletCommitInfo(tabletId, replica.getBackendId())); + })); + boolean visible = Env.getCurrentGlobalTransactionMgr() + .commitAndPublishTransaction(targetDb, Lists.newArrayList(targetTbl), + signature, tabletCommitInfos, getTimeoutMs()); + + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(label); + sb.append("', 'txnId':'").append(signature) + .append("', 'status':'"); + if (visible) { + sb.append(TransactionStatus.VISIBLE.name()).append("'"); + sb.append("}"); + } else { + // Although publish is unfinished we should tell user that commit already success. + sb.append(TransactionStatus.COMMITTED.name()).append("'"); + String msg = "delete job is committed but may be taking effect later"; + sb.append(", 'msg':'").append(msg).append("'"); + sb.append("}"); + } + return sb.toString(); + } + + @Override + public void cancel(String reason) { + GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); + try { + globalTransactionMgr.abortTransaction(deleteInfo.getDbId(), signature, reason); + } catch (Exception e) { + TransactionState state = globalTransactionMgr.getTransactionState( + deleteInfo.getDbId(), signature); + if (state == null) { + LOG.warn("cancel delete job failed because txn not found, transactionId: {}", + signature); + } else if (state.getTransactionStatus() == TransactionStatus.COMMITTED + || state.getTransactionStatus() == TransactionStatus.VISIBLE) { + LOG.warn("cancel delete job failed because it has been committed, transactionId: {}", + signature); + } else { + LOG.warn("errors while abort transaction", e); + } + } + } + + @Override + public void cleanUp() { + for (PushTask pushTask : pushTasks) { + AgentTaskQueue.removePushTask(pushTask.getBackendId(), pushTask.getSignature(), + pushTask.getVersion(), + pushTask.getPushType(), pushTask.getTaskType()); + } + } + + public static class BuildParams { + + private final Database db; + private final OlapTable table; + + private final Collection partitionNames; + + private final List deleteConditions; + + public BuildParams(Database db, OlapTable table, + Collection partitionNames, + List deleteConditions) { + this.db = db; + this.table = table; + this.partitionNames = partitionNames; + this.deleteConditions = deleteConditions; + } + + public OlapTable getTable() { + return table; + } + + public Collection getPartitionNames() { + return partitionNames; + } + + public Database getDb() { + return db; + } + + public List getDeleteConditions() { + return deleteConditions; + } + } + + public static class Builder { + + public DeleteJob buildWith(BuildParams params) throws Exception { + List partitions = getSelectedPartitions(params.getTable(), + params.getPartitionNames(), params.getDeleteConditions()); + Map partitionReplicaNum = partitions.stream() + .collect(Collectors.toMap( + Partition::getId, + partition -> + params.getTable() + .getPartitionInfo() + .getReplicaAllocation(partition.getId()) + .getTotalReplicaNum())); + // generate label + String label = DELETE_PREFIX + UUID.randomUUID(); + //generate jobId + long jobId = Env.getCurrentEnv().getNextId(); + DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), params.getTable().getId(), + params.getTable().getName(), getDeleteCondString(params.getDeleteConditions())); + DeleteJob deleteJob = new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo); + long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum(); + deleteJob.setPartitions(partitions); + deleteJob.setDeleteConditions(params.getDeleteConditions()); + deleteJob.setTargetDb(params.getDb()); + deleteJob.setTargetTbl(params.getTable()); + deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) replicaNum)); + return deleteJob; + } + + private List getSelectedPartitions(OlapTable olapTable, Collection partitionNames, + List deleteConditions) throws Exception { + if (partitionNames.isEmpty()) { + // Try to get selected partitions if no partition specified in delete statement + // Use PartitionPruner to generate the select partitions + if (olapTable.getPartitionInfo().getType() == PartitionType.RANGE + || olapTable.getPartitionInfo().getType() == PartitionType.LIST) { + Set partitionColumnNameSet = olapTable.getPartitionColumnNames(); + Map columnNameToRange = Maps.newHashMap(); + for (String colName : partitionColumnNameSet) { + ColumnRange columnRange = createColumnRange(olapTable, colName, deleteConditions); + // Not all partition columns are involved in predicate conditions + if (columnRange != null) { + columnNameToRange.put(colName, columnRange); + } + } + + Collection selectedPartitionId = null; + if (!columnNameToRange.isEmpty()) { + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Map keyItemMap = partitionInfo.getIdToItem(false); + PartitionPruner pruner = olapTable.getPartitionInfo().getType() == PartitionType.RANGE + ? new RangePartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(), + columnNameToRange) + : new ListPartitionPrunerV2(keyItemMap, partitionInfo.getPartitionColumns(), + columnNameToRange); + selectedPartitionId = pruner.prune(); + } + // selectedPartitionId is empty means no partition matches conditions. + // How to return empty set in such case? + if (selectedPartitionId != null && !selectedPartitionId.isEmpty()) { + for (long partitionId : selectedPartitionId) { + partitionNames.add(olapTable.getPartition(partitionId).getName()); + } + } else { + if (!ConnectContext.get().getSessionVariable().isDeleteWithoutPartition()) { + throw new UserException("This is a range or list partitioned table." + + " You should specify partition in delete stmt," + + " or set delete_without_partition to true"); + } else { + partitionNames.addAll(olapTable.getPartitionNames()); + } + } + } else if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) { + // this is an un-partitioned table, use table name as partition name + partitionNames.add(olapTable.getName()); + } else { + throw new UserException("Unknown partition type: " + olapTable.getPartitionInfo().getType()); + } + } + List partitions = Lists.newArrayList(); + for (String partName : partitionNames) { + Partition partition = olapTable.getPartition(partName); + if (partition == null) { + throw new DdlException("Partition does not exist. name: " + partName); + } + partitions.add(partition); + } + return partitions; + } + + // Return null if there is no filter for the partition column + private ColumnRange createColumnRange(OlapTable table, String colName, List conditions) + throws AnalysisException { + + ColumnRange result = ColumnRange.create(); + Type type = + table.getBaseSchema().stream().filter(c -> c.getName().equalsIgnoreCase(colName)) + .findFirst().get().getType(); + + boolean hasRange = false; + for (Predicate predicate : conditions) { + List> bounds = createColumnRange(colName, predicate, type); + if (bounds != null) { + hasRange = true; + result.intersect(bounds); + } + } + if (hasRange) { + return result; + } else { + return null; + } + } + + // Return null if the condition is not related to the partition column, + // or the operator is not supported. + private List> createColumnRange(String colName, Predicate condition, Type type) + throws AnalysisException { + List> result = Lists.newLinkedList(); + if (condition instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) condition; + if (!(binaryPredicate.getChild(0) instanceof SlotRef)) { + return null; + } + String columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName(); + if (!colName.equalsIgnoreCase(columnName)) { + return null; + } + ColumnBound bound = ColumnBound.of( + LiteralExpr.create(binaryPredicate.getChild(1).getStringValue(), type)); + switch (binaryPredicate.getOp()) { + case EQ: + result.add(Range.closed(bound, bound)); + break; + case GE: + result.add(Range.atLeast(bound)); + break; + case GT: + result.add(Range.greaterThan(bound)); + break; + case LT: + result.add(Range.lessThan(bound)); + break; + case LE: + result.add(Range.atMost(bound)); + break; + case NE: + result.add(Range.lessThan(bound)); + result.add(Range.greaterThan(bound)); + break; + default: + return null; + } + } else if (condition instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) condition; + if (!(inPredicate.getChild(0) instanceof SlotRef)) { + return null; + } + String columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName(); + if (!colName.equals(columnName)) { + return null; + } + if (inPredicate.isNotIn()) { + return null; + } + for (int i = 1; i <= inPredicate.getInElementNum(); i++) { + ColumnBound bound = ColumnBound.of(LiteralExpr + .create(inPredicate.getChild(i).getStringValue(), type)); + result.add(Range.closed(bound, bound)); + } + } else { + return null; + } + return result; + } + + private List getDeleteCondString(List conditions) { + List deleteConditions = Lists.newArrayListWithCapacity(conditions.size()); + // save delete conditions + for (Predicate condition : conditions) { + if (condition instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) condition; + SlotRef slotRef = (SlotRef) binaryPredicate.getChild(0); + String columnName = slotRef.getColumnName(); + String sb = columnName + " " + binaryPredicate.getOp().name() + " \"" + + binaryPredicate.getChild(1).getStringValue() + "\""; + deleteConditions.add(sb); + } else if (condition instanceof IsNullPredicate) { + IsNullPredicate isNullPredicate = (IsNullPredicate) condition; + SlotRef slotRef = (SlotRef) isNullPredicate.getChild(0); + String columnName = slotRef.getColumnName(); + StringBuilder sb = new StringBuilder(); + sb.append(columnName); + if (isNullPredicate.isNotNull()) { + sb.append(" IS NOT NULL"); + } else { + sb.append(" IS NULL"); + } + deleteConditions.add(sb.toString()); + } else if (condition instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) condition; + SlotRef slotRef = (SlotRef) inPredicate.getChild(0); + String columnName = slotRef.getColumnName(); + StringBuilder strBuilder = new StringBuilder(); + String notStr = inPredicate.isNotIn() ? "NOT " : ""; + strBuilder.append(columnName).append(" ").append(notStr).append("IN ("); + for (int i = 1; i <= inPredicate.getInElementNum(); ++i) { + strBuilder.append(inPredicate.getChild(i).toSql()); + strBuilder.append((i != inPredicate.getInElementNum()) ? ", " : ""); + } + strBuilder.append(")"); + deleteConditions.add(strBuilder.toString()); + } + } + return deleteConditions; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java new file mode 100644 index 0000000000..c12dd84169 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJobLifeCycle.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +public interface DeleteJobLifeCycle { + + /** + * @return txn id + */ + long beginTxn() throws Exception; + + /** + * dispatch push tasks in an async way + */ + void dispatch() throws Exception; + + /** + * called after dispatch, waiting for quorum to be finished + */ + void await() throws Exception; + + /** + * commit job + * @return commit msg + */ + String commit() throws Exception; + + void cancel(String reason) throws Exception; + + void cleanUp() throws Exception; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 8e62b436b8..b09f531c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -78,7 +78,6 @@ import org.apache.doris.analysis.CreateUserStmt; import org.apache.doris.analysis.CreateViewStmt; import org.apache.doris.analysis.CreateWorkloadGroupStmt; import org.apache.doris.analysis.DdlStmt; -import org.apache.doris.analysis.DeleteStmt; import org.apache.doris.analysis.DropAnalyzeJobStmt; import org.apache.doris.analysis.DropCatalogStmt; import org.apache.doris.analysis.DropDbStmt; @@ -200,8 +199,6 @@ public class DdlExecutor { } else if (ddlStmt instanceof ResumeJobStmt) { ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt; env.getJobRegister().resumeJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL); - } else if (ddlStmt instanceof DeleteStmt) { - env.getDeleteHandler().process((DeleteStmt) ddlStmt); } else if (ddlStmt instanceof CreateUserStmt) { CreateUserStmt stmt = (CreateUserStmt) ddlStmt; env.getAuth().createUser(stmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 2ce0de22cb..360353f2c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -766,8 +766,14 @@ public class StmtExecutor { } else if (parsedStmt instanceof UpdateStmt) { handleUpdateStmt(); } else if (parsedStmt instanceof DdlStmt) { - if (parsedStmt instanceof DeleteStmt && ((DeleteStmt) parsedStmt).getInsertStmt() != null) { - handleDeleteStmt(); + if (parsedStmt instanceof DeleteStmt) { + if (((DeleteStmt) parsedStmt).getInsertStmt() != null) { + handleDeleteStmt(); + } else { + Env.getCurrentEnv() + .getDeleteHandler() + .process((DeleteStmt) parsedStmt, context.getState()); + } } else { handleDdlStmt(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java index 5c62ef75a4..18f29ac30d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java @@ -43,6 +43,7 @@ import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.QueryStateException; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentBatchTask; @@ -213,8 +214,8 @@ public class DeleteHandlerTest { }; } - @Test(expected = DdlException.class) - public void testUnQuorumTimeout() throws DdlException, QueryStateException { + @Test + public void testUnQuorumTimeout() throws DdlException { BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"), new IntLiteral(3)); @@ -231,8 +232,9 @@ public class DeleteHandlerTest { minTimes = 0; } }; - deleteHandler.process(deleteStmt); - Assert.fail(); + QueryState state = connectContext.getState(); + deleteHandler.process(deleteStmt, state); + Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.ERR); } @Test @@ -265,11 +267,9 @@ public class DeleteHandlerTest { } }; - try { - deleteHandler.process(deleteStmt); - } catch (QueryStateException e) { - // CHECKSTYLE IGNORE THIS LINE - } + QueryState state = connectContext.getState(); + deleteHandler.process(deleteStmt, state); + Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK); Map idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob"); Collection jobs = idToDeleteJob.values(); @@ -310,11 +310,9 @@ public class DeleteHandlerTest { } }; - try { - deleteHandler.process(deleteStmt); - } catch (QueryStateException e) { - // CHECKSTYLE IGNORE THIS LINE - } + QueryState state = connectContext.getState(); + deleteHandler.process(deleteStmt, state); + Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK); Map idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob"); Collection jobs = idToDeleteJob.values(); @@ -324,7 +322,7 @@ public class DeleteHandlerTest { } } - @Test(expected = DdlException.class) + @Test public void testCommitFail(@Mocked MarkedCountDownLatch countDownLatch) throws DdlException, QueryStateException { BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.GT, new SlotRef(null, "k1"), new IntLiteral(3)); @@ -368,20 +366,8 @@ public class DeleteHandlerTest { } }; - try { - deleteHandler.process(deleteStmt); - } catch (DdlException e) { - Map idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob"); - Collection jobs = idToDeleteJob.values(); - Assert.assertEquals(1, jobs.size()); - for (DeleteJob job : jobs) { - Assert.assertEquals(job.getState(), DeleteState.FINISHED); - } - throw e; - } catch (QueryStateException e) { - // CHECKSTYLE IGNORE THIS LINE - } - Assert.fail(); + deleteHandler.process(deleteStmt, connectContext.getState()); + Assert.assertSame(connectContext.getState().getStateType(), QueryState.MysqlStateType.ERR); } @Test @@ -423,12 +409,9 @@ public class DeleteHandlerTest { minTimes = 0; } }; - - try { - deleteHandler.process(deleteStmt); - } catch (QueryStateException e) { - // CHECKSTYLE IGNORE THIS LINE - } + QueryState state = connectContext.getState(); + deleteHandler.process(deleteStmt, state); + Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK); Map idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob"); Collection jobs = idToDeleteJob.values(); @@ -471,11 +454,9 @@ public class DeleteHandlerTest { } }; - try { - deleteHandler.process(deleteStmt); - } catch (QueryStateException e) { - // CHECKSTYLE IGNORE THIS LINE - } + QueryState state = connectContext.getState(); + deleteHandler.process(deleteStmt, state); + Assert.assertSame(state.getStateType(), QueryState.MysqlStateType.OK); Map idToDeleteJob = Deencapsulation.getField(deleteHandler, "idToDeleteJob"); Collection jobs = idToDeleteJob.values();