diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c8b7391f8e..dc0f4510df 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1529,16 +1529,17 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { std::map succ_tablets; // partition_id, tablet_id, publish_version std::vector> discontinuous_version_tablets; - std::map tablet_id_to_num_delta_rows; + std::map table_id_to_num_delta_rows; uint32_t retry_time = 0; Status status; bool is_task_timeout = false; while (retry_time < PUBLISH_VERSION_MAX_RETRY) { succ_tablets.clear(); error_tablet_ids.clear(); + table_id_to_num_delta_rows.clear(); EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, &succ_tablets, &discontinuous_version_tablets, - &tablet_id_to_num_delta_rows); + &table_id_to_num_delta_rows); status = StorageEngine::instance()->execute_task(&engine_task); if (status.ok()) { break; @@ -1623,7 +1624,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { finish_task_request.__set_succ_tablets(succ_tablets); finish_task_request.__set_error_tablet_ids( std::vector(error_tablet_ids.begin(), error_tablet_ids.end())); - finish_task_request.__set_tablet_id_to_delta_num_rows(tablet_id_to_num_delta_rows); + finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows); _finish_task(finish_task_request); _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 910d2c86ab..a6c654ea4b 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include "common/logging.h" @@ -73,12 +74,12 @@ EnginePublishVersionTask::EnginePublishVersionTask( const TPublishVersionRequest& publish_version_req, std::set* error_tablet_ids, std::map* succ_tablets, std::vector>* discontinuous_version_tablets, - std::map* tablet_id_to_num_delta_rows) + std::map* table_id_to_num_delta_rows) : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), _succ_tablets(succ_tablets), _discontinuous_version_tablets(discontinuous_version_tablets), - _tablet_id_to_num_delta_rows(tablet_id_to_num_delta_rows) {} + _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {} void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) { std::lock_guard lck(_tablet_ids_mutex); @@ -93,7 +94,7 @@ Status EnginePublishVersionTask::finish() { std::unique_ptr token = StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); - + std::unordered_map tablet_id_to_num_delta_rows; // each partition for (auto& par_ver_info : _publish_version_req.partition_version_infos) { int64_t partition_id = par_ver_info.partition_id; @@ -189,9 +190,11 @@ Status EnginePublishVersionTask::finish() { continue; } } + auto rowset_meta_ptr = rowset->rowset_meta(); - _tablet_id_to_num_delta_rows->insert( + tablet_id_to_num_delta_rows.insert( {rowset_meta_ptr->tablet_id(), rowset_meta_ptr->num_rows()}); + auto tablet_publish_txn_ptr = std::make_shared( this, tablet, rowset, partition_id, transaction_id, version, tablet_info); auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); }); @@ -208,7 +211,6 @@ Status EnginePublishVersionTask::finish() { std::set partition_related_tablet_infos; StorageEngine::instance()->tablet_manager()->get_partition_related_tablets( partition_id, &partition_related_tablet_infos); - Version version(par_ver_info.version, par_ver_info.version); for (auto& tablet_info : partition_related_tablet_infos) { TabletSharedPtr tablet = @@ -245,6 +247,7 @@ Status EnginePublishVersionTask::finish() { } } } + _calculate_tbl_num_delta_rows(tablet_id_to_num_delta_rows); if (!res.is()) { LOG(INFO) << "finish to publish version on transaction." @@ -256,6 +259,15 @@ Status EnginePublishVersionTask::finish() { return res; } +void EnginePublishVersionTask::_calculate_tbl_num_delta_rows( + const std::unordered_map& tablet_id_to_num_delta_rows) { + for (const auto& kv : tablet_id_to_num_delta_rows) { + auto table_id = + StorageEngine::instance()->tablet_manager()->get_tablet(kv.first)->get_table_id(); + (*_table_id_to_num_delta_rows)[table_id] += kv.second; + } +} + TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 46ad5f9949..8f3790574a 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -88,7 +88,7 @@ public: const TPublishVersionRequest& publish_version_req, std::set* error_tablet_ids, std::map* succ_tablets, std::vector>* discontinous_version_tablets, - std::map* tablet_id_to_num_delta_rows); + std::map* table_id_to_num_delta_rows); ~EnginePublishVersionTask() override = default; Status finish() override; @@ -98,12 +98,15 @@ public: int64_t finish_task(); private: + void _calculate_tbl_num_delta_rows( + const std::unordered_map& tablet_id_to_num_delta_rows); + const TPublishVersionRequest& _publish_version_req; std::mutex _tablet_ids_mutex; std::set* _error_tablet_ids; std::map* _succ_tablets; std::vector>* _discontinuous_version_tablets; - std::map* _tablet_id_to_num_delta_rows; + std::map* _table_id_to_num_delta_rows; }; class AsyncTabletPublishTask { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 4cffd1b38b..2833eff5f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -491,8 +491,8 @@ public class MasterImpl { // not remove the task from queue and be will retry return; } - if (request.isSetTabletIdToDeltaNumRows()) { - publishVersionTask.setTabletIdToDeltaNumRows(request.getTabletIdToDeltaNumRows()); + if (request.isSetTableIdToDeltaNumRows()) { + publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows()); } AgentTaskQueue.removeTask(publishVersionTask.getBackendId(), publishVersionTask.getTaskType(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 998733657a..74cff551b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -40,9 +40,9 @@ public class PublishVersionTask extends AgentTask { private Map succTablets; /** - * To collect loaded rows for each tablet from each BE + * To collect loaded rows for each table from each BE */ - private final Map tabletIdToDeltaNumRows = Maps.newHashMap(); + private final Map tableIdToDeltaNumRows = Maps.newHashMap(); public PublishVersionTask(long backendId, long transactionId, long dbId, List partitionVersionInfos, long createTime) { @@ -88,11 +88,11 @@ public class PublishVersionTask extends AgentTask { this.errorTablets.addAll(errorTablets); } - public void setTabletIdToDeltaNumRows(Map tabletIdToDeltaNumRows) { - this.tabletIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows); + public void setTableIdToDeltaNumRows(Map tabletIdToDeltaNumRows) { + this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows); } - public Map getTabletIdToDeltaNumRows() { - return tabletIdToDeltaNumRows; + public Map getTableIdToDeltaNumRows() { + return tableIdToDeltaNumRows; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 973a2a483b..1921402cfd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1799,8 +1799,17 @@ public class DatabaseTransactionMgr { } } AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - LOG.debug("table id to loaded rows:{}", transactionState.getTableIdToNumDeltaRows()); - transactionState.getTableIdToNumDeltaRows().forEach(analysisManager::updateUpdatedRows); + Map tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows(); + LOG.debug("table id to loaded rows:{}", tableIdToTotalNumDeltaRows); + Map tableIdToNumDeltaRows = Maps.newHashMap(); + tableIdToTotalNumDeltaRows + .forEach((tableId, numRows) -> { + OlapTable table = (OlapTable) db.getTableNullable(tableId); + if (table != null) { + tableIdToNumDeltaRows.put(tableId, numRows / table.getReplicaCount()); + } + }); + tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index af8c4a2e1b..747508d211 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -18,8 +18,6 @@ package org.apache.doris.transaction; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.TabletInvertedIndex; -import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.metric.MetricRepo; @@ -127,8 +125,6 @@ public class PublishVersionDaemon extends MasterDaemon { AgentTaskExecutor.submit(batchTask); } - TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex(); - Set tabletIdFilter = Sets.newHashSet(); Map tableIdToNumDeltaRows = Maps.newHashMap(); // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { @@ -138,26 +134,18 @@ public class PublishVersionDaemon extends MasterDaemon { .stream() .peek(task -> { if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { - Map tabletIdToDeltaNumRows = - task.getTabletIdToDeltaNumRows(); - tabletIdToDeltaNumRows.forEach((tabletId, numRows) -> { - if (!tabletIdFilter.add(tabletId)) { - // means the delta num rows for this tablet id has been collected - return; - } - TabletMeta tabletMeta = tabletInvertedIndex.getTabletMeta(tabletId); - if (tabletMeta == null) { - // for delete, drop, schema change etc. here may be a null value - return; - } - long tableId = tabletMeta.getTableId(); - tableIdToNumDeltaRows.computeIfPresent(tableId, (tblId, orgNum) -> orgNum + numRows); + Map tableIdToDeltaNumRows = + task.getTableIdToDeltaNumRows(); + tableIdToDeltaNumRows.forEach((tableId, numRows) -> { + tableIdToDeltaNumRows + .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); tableIdToNumDeltaRows.putIfAbsent(tableId, numRows); }); } }); boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows); boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout(); if (shouldFinishTxn) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 4d72490f77..5d95917e58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -253,7 +253,10 @@ public class TransactionState implements Writable { // tbl id -> (index ids) private Map> loadedTblIndexes = Maps.newHashMap(); - private Map tableIdToNumDeltaRows = Maps.newHashMap(); + /** + * the value is the num delta rows of all replicas in each table + */ + private final Map tableIdToTotalNumDeltaRows = Maps.newHashMap(); private String errorLogUrl = null; @@ -703,12 +706,12 @@ public class TransactionState implements Writable { } } - public Map getTableIdToNumDeltaRows() { - return tableIdToNumDeltaRows; + public Map getTableIdToTotalNumDeltaRows() { + return tableIdToTotalNumDeltaRows; } - public void setTableIdToNumDeltaRows(Map tableIdToNumDeltaRows) { - this.tableIdToNumDeltaRows.putAll(tableIdToNumDeltaRows); + public void setTableIdToTotalNumDeltaRows(Map tableIdToTotalNumDeltaRows) { + this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows); } public void setErrorMsg(String errMsg) { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 0e56c0e658..9acd3f85f7 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -66,7 +66,7 @@ struct TFinishTaskRequest { 15: optional i64 copy_size 16: optional i64 copy_time_ms 17: optional map succ_tablets - 18: optional map tablet_id_to_delta_num_rows + 18: optional map table_id_to_delta_num_rows } struct TTablet {