[fix](group commit) Fix some group commit case (#30132)

This commit is contained in:
meiyi
2024-01-21 11:42:25 +08:00
committed by yiguolei
parent a9ab094614
commit 1bb1d35f70
8 changed files with 38 additions and 50 deletions

View File

@ -48,7 +48,6 @@ import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
@ -602,8 +601,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
private void waitWalFinished() {
// wait wal done here
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK);
LOG.info("block table {}", tableId);
Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
LOG.info("block group commit for table={} when schema change", tableId);
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
while (true) {
@ -611,21 +610,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished");
LOG.info("all wal is finished for table={}", tableId);
break;
} else if (System.currentTimeMillis() > expireTime) {
LOG.warn("waitWalFinished time out");
LOG.warn("waitWalFinished time out for table={}", tableId);
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.info("schema change job sleep wait for wal InterruptedException: ", ie);
LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie);
}
}
}
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL);
LOG.info("release table {}", tableId);
Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
LOG.info("unblock group commit for table={} when schema change", tableId);
}
private void onFinished(OlapTable tbl) {

View File

@ -83,7 +83,8 @@ public class SystemHandler extends AlterHandler {
}
List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && checkWal(backend)) {
boolean hasWal = checkWal(backend);
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && hasWal) {
try {
systemInfoService.dropBackend(beId);
LOG.info("no available tablet on decommission backend {}, drop it", beId);
@ -94,8 +95,9 @@ public class SystemHandler extends AlterHandler {
continue;
}
LOG.info("backend {} lefts {} replicas to decommission: {}", beId, backendTabletIds.size(),
backendTabletIds.subList(0, Math.min(10, backendTabletIds.size())));
LOG.info("backend {} lefts {} replicas to decommission: {}{}", beId, backendTabletIds.size(),
backendTabletIds.subList(0, Math.min(10, backendTabletIds.size())),
hasWal ? "; and has unfinished WALs" : "");
}
}
@ -197,8 +199,7 @@ public class SystemHandler extends AlterHandler {
}
private boolean checkWal(Backend backend) {
return Env.getCurrentEnv().getGroupCommitManager()
.getAllWalQueueSize(backend) == 0;
return Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend) == 0;
}
private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)

View File

@ -1140,7 +1140,7 @@ public class NativeInsertStmt extends InsertStmt {
return;
}
boolean partialUpdate = ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
if (!partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
if (!isExplain() && !partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()

View File

@ -84,8 +84,7 @@ public class CheckWalSizeAction extends RestBaseController {
List<Backend> backends = getBackends(hostInfos);
List<String> backendsList = new ArrayList<>();
for (Backend backend : backends) {
long size = Env.getCurrentEnv().getGroupCommitManager()
.getAllWalQueueSize(backend);
long size = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
backendsList.add(backend.getHost() + ":" + backend.getHeartbeatPort() + ":" + size);
}
return ResponseEntityBuilder.ok(backendsList);

View File

@ -88,12 +88,11 @@ public class LoadAction extends RestBaseController {
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = new String[] {db, table};
if (isGroupCommitBlock(pair)) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
}
} catch (Exception e) {
@ -122,19 +121,17 @@ public class LoadAction extends RestBaseController {
}
}
@RequestMapping(path = "/api/_http_stream",
method = RequestMethod.PUT)
public Object streamLoadWithSql(HttpServletRequest request,
HttpServletResponse response) {
@RequestMapping(path = "/api/_http_stream", method = RequestMethod.PUT)
public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse response) {
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
if (isGroupCommitBlock(pair)) {
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
}
@ -164,11 +161,11 @@ public class LoadAction extends RestBaseController {
}
}
private boolean isGroupCommitBlock(String[] pair) throws TException {
String fullDbName = getFullDbName(pair[0]);
private boolean isGroupCommitBlock(String db, String table) throws TException {
String fullDbName = getFullDbName(db);
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(fullDbName, s -> new TException("database is invalid for dbName: " + s));
Table tblObj = dbObj.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
Table tblObj = dbObj.getTableOrException(table, s -> new TException("table is invalid: " + s));
return Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId());
}

View File

@ -17,7 +17,6 @@
package org.apache.doris.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@ -30,31 +29,27 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.Future;
public class GroupCommitManager {
public enum SchemaChangeStatus {
BLOCK, NORMAL
}
private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class);
private final Map<Long, SchemaChangeStatus> statusMap = new ConcurrentHashMap<>();
private Set<Long> blockedTableIds = new HashSet<>();
public boolean isBlock(long tableId) {
if (statusMap.containsKey(tableId)) {
return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
}
return false;
return blockedTableIds.contains(tableId);
}
public void setStatus(long tableId, SchemaChangeStatus status) {
LOG.debug("Setting status for tableId {}: {}", tableId, status);
statusMap.put(tableId, status);
public void blockTable(long tableId) {
blockedTableIds.add(tableId);
}
public void unblockTable(long tableId) {
blockedTableIds.remove(tableId);
}
/**

View File

@ -153,9 +153,6 @@ public class GroupCommitPlanner {
}
}
PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder()
.setDbId(db.getId())
.setTableId(table.getId())
.setBaseSchemaVersion(table.getBaseSchemaVersion())
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
.setRequest(execPlanFragmentParamsBytes)
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())