[Function] Add timeout of connection when downloading the function objectFile from URL (#5135)

Co-authored-by: caiconghui [蔡聪辉] <caiconghui@xiaomi.com>
This commit is contained in:
caiconghui
2021-01-06 22:43:52 +08:00
committed by GitHub
parent 1035e86e0b
commit 65d33cf43c

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -36,8 +37,6 @@ import org.apache.commons.codec.binary.Hex;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
@ -69,6 +68,9 @@ public class CreateFunctionStmt extends DdlStmt {
private Function function;
private String checksum;
// timeout for both connection and read. 10 seconds is long enough.
private static final int HTTP_TIMEOUT_MS = 10000;
public CreateFunctionStmt(boolean isAggregate, FunctionName functionName, FunctionArgsDef argsDef,
TypeDef returnType, TypeDef intermediateType, Map<String, String> properties) {
this.functionName = functionName;
@ -139,22 +141,21 @@ public class CreateFunctionStmt extends DdlStmt {
checksum = "";
return;
}
URL url = new URL(objectFile);
URLConnection urlConnection = url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buf = new byte[4096];
int bytesRead = 0;
do {
bytesRead = inputStream.read(buf);
if (bytesRead < 0) {
break;
}
digest.update(buf, 0, bytesRead);
} while (true);
try (InputStream inputStream = Util.getInputStreamFromUrl(objectFile, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS)) {
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buf = new byte[4096];
int bytesRead = 0;
do {
bytesRead = inputStream.read(buf);
if (bytesRead < 0) {
break;
}
digest.update(buf, 0, bytesRead);
} while (true);
checksum = Hex.encodeHexString(digest.digest());
checksum = Hex.encodeHexString(digest.digest());
}
}
private void analyzeUda() throws AnalysisException {