[Feature-wip](MySQL Load)Support cancel query for mysql load (#17233)

Notice some changes:
1. Support cancel query for mysql load 
2. Change the thread pool for mysql load manager.
3. Fix sucret path check logic
4. Fix some doc error
This commit is contained in:
huangzhaowei
2023-03-09 22:08:26 +08:00
committed by GitHub
parent 0c48bb4d66
commit 4ddd303cfc
9 changed files with 183 additions and 16 deletions

View File

@ -39,6 +39,7 @@ import com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -429,10 +430,15 @@ public class LoadStmt extends DdlStmt {
throw new AnalysisException("Load local data from fe local is not enabled. If you want to use it,"
+ " plz set the `mysql_load_server_secure_path` for FE to be a right path.");
} else {
if (!(path.startsWith(Config.mysql_load_server_secure_path))) {
throw new AnalysisException("Local file should be under the secure path of FE.");
File file = new File(path);
try {
if (!(file.getCanonicalPath().startsWith(Config.mysql_load_server_secure_path))) {
throw new AnalysisException("Local file should be under the secure path of FE.");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (!new File(path).exists()) {
if (!file.exists()) {
throw new AnalysisException("File: " + path + " is not exists.");
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
@ -54,6 +55,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
public class MysqlLoadManager {
@ -62,12 +64,52 @@ public class MysqlLoadManager {
private final ThreadPoolExecutor mysqlLoadPool;
private final TokenManager tokenManager;
private static class MySqlLoadContext {
private boolean finished;
private HttpPut request;
private boolean isCancelled;
public MySqlLoadContext() {
this.finished = false;
this.isCancelled = false;
}
public boolean isFinished() {
return finished;
}
public void setFinished(boolean finished) {
this.finished = finished;
}
public HttpPut getRequest() {
return request;
}
public void setRequest(HttpPut request) {
this.request = request;
}
public boolean isCancelled() {
return isCancelled;
}
public void setCancelled(boolean cancelled) {
isCancelled = cancelled;
}
}
private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
public MysqlLoadManager(TokenManager tokenManager) {
this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true);
int poolSize = Config.mysql_load_thread_pool;
// MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default.
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true);
this.tokenManager = tokenManager;
}
public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt)
public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt, String loadId)
throws IOException, UserException {
LoadJobRowResult loadResult = new LoadJobRowResult();
// Mysql data load only have one data desc
@ -75,11 +117,21 @@ public class MysqlLoadManager {
List<String> filePaths = dataDesc.getFilePaths();
String database = ClusterNamespace.getNameFromFullName(dataDesc.getDbName());
String table = dataDesc.getTableName();
int oldTimeout = context.getExecTimeout();
int newTimeOut = extractTimeOut(dataDesc);
if (newTimeOut > oldTimeout) {
// set exec timeout avoid by killed TimeoutChecker
context.setExecTimeout(newTimeOut);
}
String token = tokenManager.acquireToken();
LOG.info("execute MySqlLoadJob for id: {}.", loadId);
try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
for (String file : filePaths) {
InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file);
InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file, loadId);
HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table, token);
MySqlLoadContext loadContext = new MySqlLoadContext();
loadContext.setRequest(request);
loadContextMap.put(loadId, loadContext);
try (final CloseableHttpResponse response = httpclient.execute(request)) {
String body = EntityUtils.toString(response.getEntity());
JsonObject result = JsonParser.parseString(body).getAsJsonObject();
@ -91,10 +143,51 @@ public class MysqlLoadManager {
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
}
} catch (Throwable t) {
LOG.warn("Execute mysql load {} failed", loadId, t);
// drain the data from client conn util empty packet received, otherwise the connection will be reset
if (loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) {
LOG.warn("not drained yet, try reading left data from client connection for load {}.", loadId);
ByteBuffer buffer = context.getMysqlChannel().fetchOnePacket();
// MySql client will send an empty packet when eof
while (buffer != null && buffer.limit() != 0) {
buffer = context.getMysqlChannel().fetchOnePacket();
}
LOG.debug("Finished reading the left bytes.");
}
// make cancel message to user
if (loadContextMap.containsKey(loadId) && loadContextMap.get(loadId).isCancelled()) {
throw new LoadException("Cancelled");
} else {
throw t;
}
} finally {
loadContextMap.remove(loadId);
// revert the exec timeout
if (newTimeOut > oldTimeout) {
context.setExecTimeout(oldTimeout);
}
}
return loadResult;
}
public void cancelMySqlLoad(String loadId) {
if (loadContextMap.containsKey(loadId)) {
loadContextMap.get(loadId).setCancelled(true);
loadContextMap.get(loadId).getRequest().abort();
LOG.info("Cancel MySqlLoad with id {}", loadId);
} else {
LOG.info("Load id: {} may be already finished.", loadId);
}
}
public int extractTimeOut(DataDescription desc) {
if (desc.getProperties() != null && desc.getProperties().containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
return Integer.parseInt(desc.getProperties().get(LoadStmt.TIMEOUT_PROPERTY));
}
return -1;
}
private String getColumns(DataDescription desc) {
if (desc.getFileFieldNames() != null) {
List<String> fields = desc.getFileFieldNames();
@ -114,14 +207,18 @@ public class MysqlLoadManager {
return null;
}
private InputStreamEntity getInputStreamEntity(ConnectContext context, boolean isClientLocal, String file)
private InputStreamEntity getInputStreamEntity(
ConnectContext context,
boolean isClientLocal,
String file,
String loadId)
throws IOException {
InputStream inputStream;
if (isClientLocal) {
// mysql client will check the file exist.
replyClientForReadFile(context, file);
inputStream = new ByteBufferNetworkInputStream();
fillByteBufferAsync(context, (ByteBufferNetworkInputStream) inputStream);
fillByteBufferAsync(context, (ByteBufferNetworkInputStream) inputStream, loadId);
} else {
// server side file had already check after analyze.
inputStream = Files.newInputStream(Paths.get(file));
@ -137,7 +234,7 @@ public class MysqlLoadManager {
context.getMysqlChannel().sendAndFlush(serializer.toByteBuffer());
}
private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputStream inputStream) {
private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputStream inputStream, String loadId) {
mysqlLoadPool.submit(() -> {
ByteBuffer buffer;
try {
@ -147,7 +244,11 @@ public class MysqlLoadManager {
inputStream.fillByteBuffer(buffer);
buffer = context.getMysqlChannel().fetchOnePacket();
}
if (loadContextMap.containsKey(loadId)) {
loadContextMap.get(loadId).setFinished(true);
}
} catch (IOException | InterruptedException e) {
LOG.warn("Failed fetch packet from mysql client for load: " + loadId, e);
throw new RuntimeException(e);
} finally {
inputStream.markFinished();

View File

@ -199,6 +199,7 @@ public class StmtExecutor implements ProfileWriter {
private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
private String stmtName;
private PrepareStmt prepareStmt;
private String mysqlLoadId;
// The result schema if "dry_run_query" is true.
// Only one column to indicate the real return row numbers.
@ -1010,6 +1011,9 @@ public class StmtExecutor implements ProfileWriter {
if (coordRef != null) {
coordRef.cancel();
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
}
}
// Handle kill statement.
@ -1900,8 +1904,10 @@ public class StmtExecutor implements ProfileWriter {
+ " to load client local file.");
return;
}
String loadId = UUID.randomUUID().toString();
mysqlLoadId = loadId;
LoadJobRowResult submitResult = loadManager.getMysqlLoadManager()
.executeMySqlLoadJobFromStmt(context, loadStmt);
.executeMySqlLoadJobFromStmt(context, loadStmt, loadId);
context.getState().setOk(submitResult.getRecords(), submitResult.getWarnings(),
submitResult.toString());
} else {

View File

@ -240,4 +240,47 @@ public class LoadStmtTest {
Assert.assertNull(stmt.getLabel().getDbName());
Assert.assertEquals(EtlJobType.LOCAL_FILE, stmt.getEtlJobType());
}
@Test
public void testMySqlLoadPath(@Injectable DataDescription desc) throws UserException, IOException {
File temp = File.createTempFile("testMySqlLoadData_path", ".txt");
String parentPath = temp.getParent();
String fakePath = parentPath + "/../fake_path";
new Expectations() {
{
desc.isClientLocal();
minTimes = 0;
result = false;
desc.getFilePaths();
minTimes = 0;
result = Lists.newArrayList(fakePath);
desc.toSql();
minTimes = 0;
result = "XXX";
desc.getTableName();
minTimes = 0;
result = "testTbl";
desc.analyzeFullDbName(null, (Analyzer) any);
minTimes = 0;
result = "testCluster:testDb";
desc.getMergeType();
minTimes = 0;
result = LoadTask.MergeType.APPEND;
}
};
LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap(), "");
Config.mysql_load_server_secure_path = parentPath;
try {
stmt.analyze(analyzer);
} catch (AnalysisException ae) {
Assert.assertEquals("errCode = 2, detailMessage = Local file should be under the secure path of FE.",
ae.getMessage());
}
}
}