[enhancement](group_commit) refector wal manager code (#29560)

This commit is contained in:
huanghaibin
2024-01-07 18:54:41 +08:00
committed by GitHub
parent e7b5f79664
commit 0b731800a0
19 changed files with 217 additions and 333 deletions

View File

@ -537,9 +537,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
return;
}
long maxWalId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
waitWalFinished(maxWalId);
waitWalFinished();
/*
* all tasks are finished. check the integrity.
* we just check whether all new replicas are healthy.
@ -602,20 +600,22 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
}
private void waitWalFinished(long maxWalId) {
private void waitWalFinished() {
// wait wal done here
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK);
LOG.info("block table {}", tableId);
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
boolean walFinished = false;
while (System.currentTimeMillis() < expireTime) {
while (true) {
LOG.info("wai for wal queue size to be empty");
walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, maxWalId, aliveBeIds);
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished");
break;
} else if (System.currentTimeMillis() > expireTime) {
LOG.warn("waitWalFinished time out");
break;
} else {
try {
Thread.sleep(100);
@ -624,9 +624,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
}
}
if (!walFinished) {
LOG.warn("waitWalFinished time out");
}
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL);
LOG.info("release table {}", tableId);
}

View File

@ -60,7 +60,7 @@ public class GroupCommitManager {
/**
* Check the wal before the endTransactionId is finished or not.
*/
public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) {
public boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
boolean empty = true;
for (int i = 0; i < aliveBeIds.size(); i++) {
Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
@ -70,9 +70,8 @@ public class GroupCommitManager {
}
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(tableId)
.setTxnId(endTransactionId)
.build();
long size = getWallQueueSize(backend, request);
long size = getWalQueueSize(backend, request);
if (size > 0) {
LOG.info("backend id:" + backend.getId() + ",wal size:" + size);
empty = false;
@ -84,16 +83,15 @@ public class GroupCommitManager {
public long getAllWalQueueSize(Backend backend) {
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(-1)
.setTxnId(-1)
.build();
long size = getWallQueueSize(backend, request);
long size = getWalQueueSize(backend, request);
if (size > 0) {
LOG.info("backend id:" + backend.getId() + ",all wal size:" + size);
}
return size;
}
public long getWallQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
PGetWalQueueSizeResponse response = null;
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
long size = 0;

View File

@ -330,8 +330,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
List<Column> fileColumns = new ArrayList<>();
Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
List<Column> tableColumns = table.getBaseSchema(true);
for (int i = 1; i <= tableColumns.size(); i++) {
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
for (int i = 0; i < tableColumns.size(); i++) {
Column column = new Column(tableColumns.get(i).getName(), tableColumns.get(i).getType(), true);
column.setUniqueId(tableColumns.get(i).getUniqueId());
fileColumns.add(column);
}
return fileColumns;
}