[Feature-Wip](MySQL Load)Show load warning for my sql load (#18224)

1. Support the show load warnings for mysql load to get the detail error message.
2. Fix fillByteBufferAsync not mark the load as finished in same data load
3. Fix drain data only in client mode.
This commit is contained in:
huangzhaowei
2023-04-04 22:44:48 +08:00
committed by GitHub
parent e29fc3b46b
commit 7c36bef6bc
8 changed files with 185 additions and 10 deletions

View File

@ -39,6 +39,7 @@ import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Joiner;
import com.google.common.collect.EvictingQueue;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -60,7 +61,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MysqlLoadManager {
private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class);
@ -103,14 +107,43 @@ public class MysqlLoadManager {
}
}
private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
private static class MySqlLoadFailRecord {
private final String label;
private final String errorUrl;
private final long startTime;
public MySqlLoadFailRecord(String label, String errorUrl) {
this.label = label;
this.errorUrl = errorUrl;
this.startTime = System.currentTimeMillis();
}
public String getLabel() {
return label;
}
public String getErrorUrl() {
return errorUrl;
}
public boolean isExpired() {
// hard code the expired value for one day.
return System.currentTimeMillis() > startTime + 24 * 60 * 60 * 1000;
}
}
private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
public MysqlLoadManager(TokenManager tokenManager) {
int poolSize = Config.mysql_load_thread_pool;
// MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default.
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true);
this.tokenManager = tokenManager;
this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record);
this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS);
}
public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt, String loadId)
@ -131,20 +164,22 @@ public class MysqlLoadManager {
new SetVar(SessionVariable.QUERY_TIMEOUT, new StringLiteral(String.valueOf(newTimeOut))));
}
String token = tokenManager.acquireToken();
boolean clientLocal = dataDesc.isClientLocal();
MySqlLoadContext loadContext = new MySqlLoadContext();
loadContextMap.put(loadId, loadContext);
LOG.info("execute MySqlLoadJob for id: {}.", loadId);
try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
for (String file : filePaths) {
InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file, loadId);
InputStreamEntity entity = getInputStreamEntity(context, clientLocal, file, loadId);
HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table, token);
MySqlLoadContext loadContext = new MySqlLoadContext();
loadContext.setRequest(request);
loadContextMap.put(loadId, loadContext);
try (final CloseableHttpResponse response = httpclient.execute(request)) {
String body = EntityUtils.toString(response.getEntity());
JsonObject result = JsonParser.parseString(body).getAsJsonObject();
if (!result.get("Status").getAsString().equalsIgnoreCase("Success")) {
failedRecords.offer(new MySqlLoadFailRecord(loadId, result.get("ErrorURL").getAsString()));
LOG.warn("Execute mysql data load failed with request: {} and response: {}", request, body);
throw new LoadException(result.get("Message").getAsString());
throw new LoadException(result.get("Message").getAsString() + " with load id " + loadId);
}
loadResult.incRecords(result.get("NumberLoadedRows").getAsLong());
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
@ -153,7 +188,7 @@ public class MysqlLoadManager {
} catch (Throwable t) {
LOG.warn("Execute mysql load {} failed", loadId, t);
// drain the data from client conn util empty packet received, otherwise the connection will be reset
if (loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) {
if (clientLocal && loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) {
LOG.warn("not drained yet, try reading left data from client connection for load {}.", loadId);
ByteBuffer buffer = context.getMysqlChannel().fetchOnePacket();
// MySql client will send an empty packet when eof
@ -184,7 +219,22 @@ public class MysqlLoadManager {
}
}
public int extractTimeOut(DataDescription desc) {
public String getErrorUrlByLoadId(String loadId) {
for (MySqlLoadFailRecord record : failedRecords) {
if (loadId.equals(record.getLabel())) {
return record.getErrorUrl();
}
}
return null;
}
private void cleanFailedRecords() {
while (!failedRecords.isEmpty() && failedRecords.peek().isExpired()) {
failedRecords.poll();
}
}
private int extractTimeOut(DataDescription desc) {
if (desc.getProperties() != null && desc.getProperties().containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
return Integer.parseInt(desc.getProperties().get(LoadStmt.TIMEOUT_PROPERTY));
}

View File

@ -210,6 +210,7 @@ import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
@ -1284,6 +1285,22 @@ public class ShowExecutor {
}
Env env = Env.getCurrentEnv();
// try to fetch load id from mysql load first and mysql load only support find by label.
if (showWarningsStmt.isFindByLabel()) {
String label = showWarningsStmt.getLabel();
String urlString = env.getLoadManager().getMysqlLoadManager().getErrorUrlByLoadId(label);
if (urlString != null && !urlString.isEmpty()) {
URL url;
try {
url = new URL(urlString);
} catch (MalformedURLException e) {
throw new AnalysisException("Invalid url: " + e.getMessage());
}
handleShowLoadWarningsFromURL(showWarningsStmt, url);
return;
}
}
Database db = env.getInternalCatalog().getDbOrAnalysisException(showWarningsStmt.getDbName());
long dbId = db.getId();