[bug](group_commit) fix relay wal problem on materialized-view (#29848)
This commit is contained in:
@ -234,6 +234,7 @@ Status WalTable::_construct_sql_str(const std::string& wal, const std::string& l
|
||||
auto it = column_info_map.find(column_id);
|
||||
if (it != column_info_map.end()) {
|
||||
ss_name << "`" << it->second << "`,";
|
||||
column_info_map.erase(column_id);
|
||||
}
|
||||
} catch (const std::invalid_argument& e) {
|
||||
return Status::InvalidArgument("Invalid format, {}", e.what());
|
||||
|
||||
@ -61,6 +61,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
|
||||
if (block->rows() > 0) {
|
||||
if (!config::group_commit_wait_replay_wal_finish) {
|
||||
_block_queue.push_back(block);
|
||||
_data_bytes += block->bytes();
|
||||
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
} else {
|
||||
LOG(INFO) << "skip adding block to queue on txn " << txn_id;
|
||||
}
|
||||
@ -71,8 +73,6 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
|
||||
return st;
|
||||
}
|
||||
}
|
||||
_data_bytes += block->bytes();
|
||||
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
|
||||
}
|
||||
if (_data_bytes >= _group_commit_data_bytes) {
|
||||
VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label
|
||||
|
||||
@ -607,7 +607,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
|
||||
while (true) {
|
||||
LOG.info("wai for wal queue size to be empty");
|
||||
LOG.info("wait for wal queue size to be empty");
|
||||
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
|
||||
.isPreviousWalFinished(tableId, aliveBeIds);
|
||||
if (walFinished) {
|
||||
|
||||
@ -333,7 +333,12 @@ public class LoadAction extends RestBaseController {
|
||||
.setEnableRoundRobin(true)
|
||||
.needLoadAvailable().build();
|
||||
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
|
||||
List<Long> backendIds;
|
||||
if (groupCommit) {
|
||||
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
|
||||
} else {
|
||||
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
|
||||
}
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user