[improvement](insert) support schema change and decommission for group commit (#26359)
This commit is contained in:
@ -40,6 +40,7 @@ import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
@ -47,6 +48,7 @@ import org.apache.doris.common.SchemaVersionAndHash;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.util.DbUtil;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
@ -322,7 +324,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
} else {
|
||||
// only show at most 3 results
|
||||
List<String> subList = countDownLatch.getLeftMarks().stream().limit(3)
|
||||
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
|
||||
.map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")")
|
||||
.collect(Collectors.toList());
|
||||
errMsg = "Error replicas:" + Joiner.on(", ").join(subList);
|
||||
}
|
||||
@ -529,12 +531,15 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size();
|
||||
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
|
||||
throw new AlterCancelException("schema change tasks failed on same tablet reach threshold "
|
||||
+ failedAgentTasks.get(task.getTabletId()));
|
||||
+ failedAgentTasks.get(task.getTabletId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
long maxWalId = Env.getCurrentGlobalTransactionMgr()
|
||||
.getTransactionIDGenerator().getNextTransactionId();
|
||||
waitWalFinished(maxWalId);
|
||||
/*
|
||||
* all tasks are finished. check the integrity.
|
||||
* we just check whether all new replicas are healthy.
|
||||
@ -580,7 +585,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
} // end for tablets
|
||||
}
|
||||
} // end for partitions
|
||||
|
||||
// all partitions are good
|
||||
onFinished(tbl);
|
||||
} finally {
|
||||
@ -598,6 +602,35 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
|
||||
}
|
||||
|
||||
private void waitWalFinished(long maxWalId) {
|
||||
// wait wal done here
|
||||
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK);
|
||||
LOG.info("block table {}", tableId);
|
||||
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
|
||||
boolean walFinished = false;
|
||||
while (System.currentTimeMillis() < expireTime) {
|
||||
LOG.info("wai for wal queue size to be empty");
|
||||
walFinished = Env.getCurrentEnv().getGroupCommitManager()
|
||||
.isPreviousWalFinished(tableId, maxWalId, aliveBeIds);
|
||||
if (walFinished) {
|
||||
LOG.info("all wal is finished");
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("schema change job sleep wait for wal InterruptedException: ", ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!walFinished) {
|
||||
LOG.warn("waitWalFinished time out");
|
||||
}
|
||||
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL);
|
||||
LOG.info("release table {}", tableId);
|
||||
}
|
||||
|
||||
private void onFinished(OlapTable tbl) {
|
||||
// replace the origin index with shadow index, set index state as NORMAL
|
||||
for (Partition partition : tbl.getPartitions()) {
|
||||
|
||||
@ -83,7 +83,7 @@ public class SystemHandler extends AlterHandler {
|
||||
}
|
||||
|
||||
List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
|
||||
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds)) {
|
||||
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && checkWal(backend)) {
|
||||
try {
|
||||
systemInfoService.dropBackend(beId);
|
||||
LOG.info("no available tablet on decommission backend {}, drop it", beId);
|
||||
@ -196,6 +196,11 @@ public class SystemHandler extends AlterHandler {
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean checkWal(Backend backend) {
|
||||
return Env.getCurrentEnv().getGroupCommitManager()
|
||||
.getAllWalQueueSize(backend) == 0;
|
||||
}
|
||||
|
||||
private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
|
||||
throws DdlException {
|
||||
if (decommissionBackendClause.getHostInfos().isEmpty()) {
|
||||
|
||||
@ -155,6 +155,7 @@ import org.apache.doris.journal.bdbje.Timestamp;
|
||||
import org.apache.doris.load.DeleteHandler;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
import org.apache.doris.load.ExportMgr;
|
||||
import org.apache.doris.load.GroupCommitManager;
|
||||
import org.apache.doris.load.Load;
|
||||
import org.apache.doris.load.StreamLoadRecordMgr;
|
||||
import org.apache.doris.load.loadv2.LoadEtlChecker;
|
||||
@ -333,6 +334,7 @@ public class Env {
|
||||
private StreamLoadRecordMgr streamLoadRecordMgr;
|
||||
private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr;
|
||||
private RoutineLoadManager routineLoadManager;
|
||||
private GroupCommitManager groupCommitManager;
|
||||
private SqlBlockRuleMgr sqlBlockRuleMgr;
|
||||
private ExportMgr exportMgr;
|
||||
private SyncJobManager syncJobManager;
|
||||
@ -603,6 +605,7 @@ public class Env {
|
||||
this.catalogMgr = new CatalogMgr();
|
||||
this.load = new Load();
|
||||
this.routineLoadManager = new RoutineLoadManager();
|
||||
this.groupCommitManager = new GroupCommitManager();
|
||||
this.sqlBlockRuleMgr = new SqlBlockRuleMgr();
|
||||
this.exportMgr = new ExportMgr();
|
||||
this.syncJobManager = new SyncJobManager();
|
||||
@ -3778,6 +3781,10 @@ public class Env {
|
||||
return routineLoadManager;
|
||||
}
|
||||
|
||||
public GroupCommitManager getGroupCommitManager() {
|
||||
return groupCommitManager;
|
||||
}
|
||||
|
||||
public SqlBlockRuleMgr getSqlBlockRuleMgr() {
|
||||
return sqlBlockRuleMgr;
|
||||
}
|
||||
|
||||
@ -0,0 +1,113 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.httpv2.rest;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.system.SystemInfoService.HostInfo;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
/**
|
||||
* cal wal size of specific be
|
||||
* fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2...
|
||||
* return:
|
||||
* {
|
||||
* "msg": "OK",
|
||||
* "code": 0,
|
||||
* "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"],
|
||||
* "count": 0
|
||||
* }
|
||||
*/
|
||||
|
||||
@RestController
|
||||
public class CheckWalSizeAction extends RestBaseController {
|
||||
public static final String HOST_PORTS = "host_ports";
|
||||
|
||||
@RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET)
|
||||
public Object execute(HttpServletRequest request, HttpServletResponse response) {
|
||||
// check user auth
|
||||
executeCheckPassword(request, response);
|
||||
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR);
|
||||
|
||||
String hostPorts = request.getParameter(HOST_PORTS);
|
||||
if (Strings.isNullOrEmpty(hostPorts)) {
|
||||
return ResponseEntityBuilder.badRequest("No host:port specified");
|
||||
}
|
||||
|
||||
String[] hostPortArr = hostPorts.split(",");
|
||||
if (hostPortArr.length == 0) {
|
||||
return ResponseEntityBuilder.badRequest("No host:port specified");
|
||||
}
|
||||
|
||||
List<HostInfo> hostInfos = Lists.newArrayList();
|
||||
for (String hostPort : hostPortArr) {
|
||||
try {
|
||||
HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort);
|
||||
hostInfos.add(hostInfo);
|
||||
} catch (AnalysisException e) {
|
||||
return ResponseEntityBuilder.badRequest(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
List<Backend> backends = getBackends(hostInfos);
|
||||
List<String> backendsList = new ArrayList<>();
|
||||
for (Backend backend : backends) {
|
||||
long size = Env.getCurrentEnv().getGroupCommitManager()
|
||||
.getAllWalQueueSize(backend);
|
||||
backendsList.add(backend.getHost() + ":" + backend.getHeartbeatPort() + ":" + size);
|
||||
}
|
||||
return ResponseEntityBuilder.ok(backendsList);
|
||||
} catch (DdlException e) {
|
||||
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private List<Backend> getBackends(List<HostInfo> hostInfos) throws DdlException {
|
||||
SystemInfoService infoService = Env.getCurrentSystemInfo();
|
||||
List<Backend> backends = Lists.newArrayList();
|
||||
// check if exist
|
||||
for (HostInfo hostInfo : hostInfos) {
|
||||
Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getHost(),
|
||||
hostInfo.getPort());
|
||||
if (backend == null) {
|
||||
throw new DdlException("Backend does not exist["
|
||||
+ hostInfo.getHost()
|
||||
+ ":" + hostInfo.getPort() + "]");
|
||||
}
|
||||
backends.add(backend);
|
||||
}
|
||||
return backends;
|
||||
}
|
||||
}
|
||||
@ -17,7 +17,9 @@
|
||||
|
||||
package org.apache.doris.httpv2.rest;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -38,6 +40,7 @@ import com.google.common.base.Strings;
|
||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
@ -73,7 +76,7 @@ public class LoadAction extends RestBaseController {
|
||||
return entity;
|
||||
} else {
|
||||
executeCheckPassword(request, response);
|
||||
return executeWithoutPassword(request, response, db, table, false);
|
||||
return executeWithoutPassword(request, response, db, table, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,6 +84,22 @@ public class LoadAction extends RestBaseController {
|
||||
public Object streamLoad(HttpServletRequest request,
|
||||
HttpServletResponse response,
|
||||
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
|
||||
boolean groupCommit = false;
|
||||
String groupCommitStr = request.getHeader("group_commit");
|
||||
if (groupCommitStr != null && groupCommitStr.equals("true")) {
|
||||
groupCommit = true;
|
||||
try {
|
||||
String[] pair = new String[] {db, table};
|
||||
LOG.info(pair[0] + ":" + pair[1]);
|
||||
if (isGroupCommitBlock(pair)) {
|
||||
String msg = "insert table " + pair[1] + " is blocked on schema change";
|
||||
return new RestBaseResult(msg);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.info("exception:" + e);
|
||||
return new RestBaseResult(e.getMessage());
|
||||
}
|
||||
}
|
||||
if (needRedirect(request.getScheme())) {
|
||||
return redirectToHttps(request);
|
||||
}
|
||||
@ -99,7 +118,7 @@ public class LoadAction extends RestBaseController {
|
||||
}
|
||||
}
|
||||
|
||||
return executeWithoutPassword(request, response, db, table, true);
|
||||
return executeWithoutPassword(request, response, db, table, true, groupCommit);
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/api/_http_stream",
|
||||
@ -108,6 +127,21 @@ public class LoadAction extends RestBaseController {
|
||||
HttpServletResponse response) {
|
||||
String sql = request.getHeader("sql");
|
||||
LOG.info("streaming load sql={}", sql);
|
||||
boolean groupCommit = false;
|
||||
String groupCommitStr = request.getHeader("group_commit");
|
||||
if (groupCommitStr != null && groupCommitStr.equals("true")) {
|
||||
groupCommit = true;
|
||||
try {
|
||||
String[] pair = parseDbAndTb(sql);
|
||||
if (isGroupCommitBlock(pair)) {
|
||||
String msg = "insert table " + pair[1] + " is blocked on schema change";
|
||||
return new RestBaseResult(msg);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.info("exception:" + e);
|
||||
return new RestBaseResult(e.getMessage());
|
||||
}
|
||||
}
|
||||
executeCheckPassword(request, response);
|
||||
try {
|
||||
// A 'Load' request must have 100-continue header
|
||||
@ -122,7 +156,7 @@ public class LoadAction extends RestBaseController {
|
||||
|
||||
String label = request.getHeader(LABEL_KEY);
|
||||
TNetworkAddress redirectAddr;
|
||||
redirectAddr = selectRedirectBackend(clusterName);
|
||||
redirectAddr = selectRedirectBackend(clusterName, groupCommit);
|
||||
|
||||
LOG.info("redirect load action to destination={}, label: {}",
|
||||
redirectAddr.toString(), label);
|
||||
@ -134,6 +168,43 @@ public class LoadAction extends RestBaseController {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isGroupCommitBlock(String[] pair) throws TException {
|
||||
String fullDbName = getFullDbName(pair[0]);
|
||||
Database dbObj = Env.getCurrentInternalCatalog()
|
||||
.getDbOrException(fullDbName, s -> new TException("database is invalid for dbName: " + s));
|
||||
Table tblObj = dbObj.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
|
||||
return Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId());
|
||||
}
|
||||
|
||||
private String[] parseDbAndTb(String sql) throws Exception {
|
||||
String[] array = sql.split(" ");
|
||||
String tmp = null;
|
||||
int count = 0;
|
||||
for (String s : array) {
|
||||
if (!s.equals("")) {
|
||||
count++;
|
||||
if (count == 3) {
|
||||
tmp = s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tmp == null) {
|
||||
throw new Exception("parse db and tb with wrong sql:" + sql);
|
||||
}
|
||||
String pairStr = null;
|
||||
if (tmp.contains("(")) {
|
||||
pairStr = tmp.split("\\(")[0];
|
||||
} else {
|
||||
pairStr = tmp;
|
||||
}
|
||||
String[] pair = pairStr.split("\\.");
|
||||
if (pair.length != 2) {
|
||||
throw new Exception("parse db and tb with wrong sql:" + sql);
|
||||
}
|
||||
return pair;
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
|
||||
public Object streamLoad2PC(HttpServletRequest request,
|
||||
HttpServletResponse response,
|
||||
@ -162,7 +233,7 @@ public class LoadAction extends RestBaseController {
|
||||
// Same as Multi load, to be compatible with http v1's response body,
|
||||
// we return error by using RestBaseResult.
|
||||
private Object executeWithoutPassword(HttpServletRequest request,
|
||||
HttpServletResponse response, String db, String table, boolean isStreamLoad) {
|
||||
HttpServletResponse response, String db, String table, boolean isStreamLoad, boolean groupCommit) {
|
||||
try {
|
||||
String dbName = db;
|
||||
String tableName = table;
|
||||
@ -213,7 +284,7 @@ public class LoadAction extends RestBaseController {
|
||||
return new RestBaseResult(e.getMessage());
|
||||
}
|
||||
} else {
|
||||
redirectAddr = selectRedirectBackend(clusterName);
|
||||
redirectAddr = selectRedirectBackend(clusterName, groupCommit);
|
||||
}
|
||||
|
||||
LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
|
||||
@ -249,7 +320,7 @@ public class LoadAction extends RestBaseController {
|
||||
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
|
||||
}
|
||||
|
||||
TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
|
||||
TNetworkAddress redirectAddr = selectRedirectBackend(clusterName, false);
|
||||
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
|
||||
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
|
||||
|
||||
@ -261,18 +332,30 @@ public class LoadAction extends RestBaseController {
|
||||
}
|
||||
}
|
||||
|
||||
private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
|
||||
String qualifiedUser = ConnectContext.get().getQualifiedUser();
|
||||
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
|
||||
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
|
||||
.addTags(userTags)
|
||||
.needLoadAvailable().build();
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
|
||||
private TNetworkAddress selectRedirectBackend(String clusterName, boolean groupCommit) throws LoadException {
|
||||
Backend backend = null;
|
||||
BeSelectionPolicy policy = null;
|
||||
if (groupCommit) {
|
||||
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
for (Long backendId : allBackendIds) {
|
||||
Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId);
|
||||
if (!candidateBe.isDecommissioned()) {
|
||||
backend = candidateBe;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
String qualifiedUser = ConnectContext.get().getQualifiedUser();
|
||||
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
|
||||
policy = new BeSelectionPolicy.Builder()
|
||||
.addTags(userTags)
|
||||
.needLoadAvailable().build();
|
||||
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
|
||||
if (backendIds.isEmpty()) {
|
||||
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
|
||||
}
|
||||
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
|
||||
}
|
||||
|
||||
Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
|
||||
if (backend == null) {
|
||||
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
|
||||
}
|
||||
@ -340,7 +423,7 @@ public class LoadAction extends RestBaseController {
|
||||
return new RestBaseResult("No label selected.");
|
||||
}
|
||||
|
||||
TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
|
||||
TNetworkAddress redirectAddr = selectRedirectBackend(clusterName, false);
|
||||
|
||||
LOG.info("Redirect load action with auth token to destination={},"
|
||||
+ "stream: {}, db: {}, tbl: {}, label: {}",
|
||||
|
||||
@ -0,0 +1,145 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.load;
|
||||
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
|
||||
import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
|
||||
import org.apache.doris.rpc.BackendServiceProxy;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class GroupCommitManager {
|
||||
|
||||
public enum SchemaChangeStatus {
|
||||
BLOCK, NORMAL
|
||||
}
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class);
|
||||
|
||||
private final Map<Long, SchemaChangeStatus> statusMap = new ConcurrentHashMap<>();
|
||||
|
||||
public boolean isBlock(long tableId) {
|
||||
if (statusMap.containsKey(tableId)) {
|
||||
return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void setStatus(long tableId, SchemaChangeStatus status) {
|
||||
LOG.debug("Setting status for tableId {}: {}", tableId, status);
|
||||
statusMap.put(tableId, status);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the wal before the endTransactionId is finished or not.
|
||||
*/
|
||||
public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) {
|
||||
boolean empty = true;
|
||||
for (int i = 0; i < aliveBeIds.size(); i++) {
|
||||
Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
|
||||
// in ut port is -1, skip checking
|
||||
if (backend.getBrpcPort() < 0) {
|
||||
return true;
|
||||
}
|
||||
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
|
||||
.setTableId(tableId)
|
||||
.setTxnId(endTransactionId)
|
||||
.build();
|
||||
long size = getWallQueueSize(backend, request);
|
||||
if (size > 0) {
|
||||
LOG.info("backend id:" + backend.getId() + ",wal size:" + size);
|
||||
empty = false;
|
||||
}
|
||||
}
|
||||
return empty;
|
||||
}
|
||||
|
||||
public long getAllWalQueueSize(Backend backend) {
|
||||
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
|
||||
.setTableId(-1)
|
||||
.setTxnId(-1)
|
||||
.build();
|
||||
long size = getWallQueueSize(backend, request);
|
||||
if (size > 0) {
|
||||
LOG.info("backend id:" + backend.getId() + ",all wal size:" + size);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
public long getWallQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
|
||||
PGetWalQueueSizeResponse response = null;
|
||||
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
|
||||
long size = 0;
|
||||
while (System.currentTimeMillis() <= expireTime) {
|
||||
if (!backend.isAlive()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("group commit manager sleep wait InterruptedException: ", ie);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance()
|
||||
.getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
|
||||
response = future.get();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId()
|
||||
+ ",exception:" + e);
|
||||
String msg = e.getMessage();
|
||||
if (msg.contains("Method") && msg.contains("unimplemented")) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("group commit manager sleep wait InterruptedException: ", ie);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
String msg = "get all queue size fail,backend id: " + backend.getId() + ", status: "
|
||||
+ response.getStatus();
|
||||
LOG.warn(msg);
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("group commit manager sleep wait InterruptedException: ", ie);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
size = response.getSize();
|
||||
break;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
}
|
||||
@ -149,6 +149,7 @@ import org.apache.doris.rpc.RpcException;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.statistics.ResultRow;
|
||||
import org.apache.doris.statistics.util.InternalQueryBuffer;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.task.LoadEtlTask;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
@ -1854,7 +1855,33 @@ public class StmtExecutor {
|
||||
txnId = context.getTxnEntry().getTxnConf().getTxnId();
|
||||
} else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) {
|
||||
isGroupCommit = true;
|
||||
if (Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId())) {
|
||||
String msg = "insert table " + insertStmt.getTargetTable().getId() + " is blocked on schema change";
|
||||
LOG.info(msg);
|
||||
throw new DdlException(msg);
|
||||
}
|
||||
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
|
||||
Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId());
|
||||
if (backend == null || !backend.isAlive() || backend.isDecommissioned()) {
|
||||
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
|
||||
if (allBackendIds.isEmpty()) {
|
||||
throw new DdlException("No alive backend");
|
||||
}
|
||||
Collections.shuffle(allBackendIds);
|
||||
boolean find = false;
|
||||
for (Long beId : allBackendIds) {
|
||||
backend = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
if (!backend.isDecommissioned()) {
|
||||
context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend);
|
||||
find = true;
|
||||
LOG.debug("choose new be {}", backend.getId());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!find) {
|
||||
throw new DdlException("No suitable backend");
|
||||
}
|
||||
}
|
||||
int maxRetry = 3;
|
||||
for (int i = 0; i < maxRetry; i++) {
|
||||
GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId);
|
||||
|
||||
@ -161,6 +161,12 @@ public class BackendServiceClient {
|
||||
return stub.groupCommitInsert(request);
|
||||
}
|
||||
|
||||
public Future<InternalService.PGetWalQueueSizeResponse> getWalQueueSize(
|
||||
InternalService.PGetWalQueueSizeRequest request) {
|
||||
return stub.getWalQueueSize(request);
|
||||
}
|
||||
|
||||
|
||||
public void shutdown() {
|
||||
if (!channel.isShutdown()) {
|
||||
channel.shutdown();
|
||||
|
||||
@ -23,6 +23,8 @@ import org.apache.doris.common.util.NetUtils;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
|
||||
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
|
||||
import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
|
||||
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
|
||||
import org.apache.doris.proto.Types;
|
||||
@ -451,4 +453,18 @@ public class BackendServiceProxy {
|
||||
throw new RpcException(address.hostname, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Future<PGetWalQueueSizeResponse> getWalQueueSize(TNetworkAddress address,
|
||||
PGetWalQueueSizeRequest request) throws RpcException {
|
||||
try {
|
||||
final BackendServiceClient client = getProxy(address);
|
||||
return client.getWalQueueSize(request);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("failed to get wal queue size from address={}:{}", address.getHostname(),
|
||||
address.getPort(), e);
|
||||
throw new RpcException(address.hostname, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -143,6 +143,8 @@ import org.apache.doris.thrift.TGetBackendMetaResult;
|
||||
import org.apache.doris.thrift.TGetBinlogLagResult;
|
||||
import org.apache.doris.thrift.TGetBinlogRequest;
|
||||
import org.apache.doris.thrift.TGetBinlogResult;
|
||||
import org.apache.doris.thrift.TGetColumnInfoRequest;
|
||||
import org.apache.doris.thrift.TGetColumnInfoResult;
|
||||
import org.apache.doris.thrift.TGetDbsParams;
|
||||
import org.apache.doris.thrift.TGetDbsResult;
|
||||
import org.apache.doris.thrift.TGetMasterTokenRequest;
|
||||
@ -3479,6 +3481,45 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
|
||||
TGetColumnInfoResult result = new TGetColumnInfoResult();
|
||||
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
|
||||
long dbId = request.getDbId();
|
||||
long tableId = request.getTableId();
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
|
||||
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
|
||||
LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG);
|
||||
return result;
|
||||
}
|
||||
|
||||
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
Table table = db.getTable(tableId).get();
|
||||
if (table == null) {
|
||||
errorStatus.setErrorMsgs(
|
||||
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Column column : table.getFullSchema()) {
|
||||
sb.append(column.getName() + ":" + column.getUniqueId() + ",");
|
||||
}
|
||||
String columnInfo = sb.toString();
|
||||
columnInfo = columnInfo.substring(0, columnInfo.length() - 1);
|
||||
result.setStatus(new TStatus(TStatusCode.OK));
|
||||
result.setColumnInfo(columnInfo);
|
||||
return result;
|
||||
}
|
||||
|
||||
public TGetBackendMetaResult getBackendMeta(TGetBackendMetaRequest request) throws TException {
|
||||
String clientAddr = getClientAddrAsString();
|
||||
LOG.debug("receive get backend meta request: {}", request);
|
||||
|
||||
Reference in New Issue
Block a user