diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp index 54500273da..a6d7a4054c 100644 --- a/be/src/olap/wal/wal_table.cpp +++ b/be/src/olap/wal/wal_table.cpp @@ -234,6 +234,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, const std::string& l auto it = column_info_map.find(column_id); if (it != column_info_map.end()) { ss_name << "`" << it->second << "`,"; + column_info_map.erase(column_id); } } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 6d2ec020e7..692f7c6846 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -61,6 +61,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, if (block->rows() > 0) { if (!config::group_commit_wait_replay_wal_finish) { _block_queue.push_back(block); + _data_bytes += block->bytes(); + _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } else { LOG(INFO) << "skip adding block to queue on txn " << txn_id; } @@ -71,8 +73,6 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, return st; } } - _data_bytes += block->bytes(); - _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } if (_data_bytes >= _group_commit_data_bytes) { VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index d40020fd57..eef902dea8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -607,7 +607,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { List aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; while (true) { - LOG.info("wai for wal queue size to be empty"); + LOG.info("wait for wal queue size to be empty"); boolean walFinished = Env.getCurrentEnv().getGroupCommitManager() .isPreviousWalFinished(tableId, aliveBeIds); if (walFinished) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 787f77f1da..2b4f20a57c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -333,7 +333,12 @@ public class LoadAction extends RestBaseController { .setEnableRoundRobin(true) .needLoadAvailable().build(); policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); - List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + List backendIds; + if (groupCommit) { + backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1); + } else { + backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + } if (backendIds.isEmpty()) { throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); }