[Plugin] Add timeout of connection when downloading the plugins from URL (#3755)
If no timeout is set, the download process may be blocked forever.
This commit is contained in:
@ -267,9 +267,9 @@ The easiest way, you can implement your plugin by modifying the example `auditde
|
||||
|
||||
Doris's plugin can be deployed in three ways:
|
||||
|
||||
* Http or Https .zip, like `http://xxx.xxxxxx.com/data/plugin.zip`, Doris will download this .zip file
|
||||
* Local .zip, like `/home/work/data/plugin.zip`, need to be deployed on all FE and BE nodes
|
||||
* Local directory, like `/home/work/data/plugin`, .zip decompressed folder, need to be deployed on all FE, BE nodes
|
||||
* Http or Https .zip, like `http://xxx.xxxxxx.com/data/plugin.zip`, Doris will download this .zip file. At the same time, an md5 file with the same name as the `.zip` file needs to be placed. Such as `http://xxx.xxxxxx.com/data/my_plugin.zip.md5`. The content is the MD5 value of the .zip file.
|
||||
* Local .zip, like `/home/work/data/plugin.zip`. If the plug-in is only used for FE, it needs to be deployed in the same directory of all FE nodes. Otherwise, it needs to be deployed on all FE and BE nodes.
|
||||
* Local directory, like `/home/work/data/plugin`, .zip decompressed folder. If the plug-in is only used for FE, it needs to be deployed in the same directory of all FE nodes. Otherwise, it needs to be deployed on all FE and BE nodes.
|
||||
|
||||
Note: Need to ensure that the plugin .zip file is available in the life cycle of doris!
|
||||
|
||||
|
||||
@ -266,9 +266,9 @@ mvn archetype: generate -DarchetypeCatalog = internal -DgroupId = org.apache -Da
|
||||
|
||||
插件可以通过以下三种方式部署。
|
||||
|
||||
* 将 `.zip` 文件放在 Http 或 Https 服务器上。如:`http://xxx.xxxxxx.com/data/plugin.zip`, Doris 会下载这个文件。
|
||||
* 本地 `.zip` 文件。 如:`/home/work/data/plugin.zip`。需要在所有 FE 和 BE 节点部署。
|
||||
* 本地目录。如:`/home/work/data/plugin/`。相当于 `.zip` 文件解压后的目录。需要在所有 FE 和 BE 节点部署。
|
||||
* 将 `.zip` 文件放在 Http 或 Https 服务器上。如:`http://xxx.xxx.com/data/my_plugin.zip`, Doris 会下载这个文件。同时需要放置一个和 `.zip` 文件同名的 md5 文件。如 `http://xxx.xxxxxx.com/data/my_plugin.zip.md5`。其中内容为 .zip 文件的 MD5 值。
|
||||
* 本地 `.zip` 文件。 如:`/home/work/data/plugin.zip`。如果该插件仅用于 FE,则需部署在所有 FE 节点相同的目录下。否则,需要在所有 FE 和 BE 节点部署。
|
||||
* 本地目录。如:`/home/work/data/plugin/`。相当于 `.zip` 文件解压后的目录。如果该插件仅用于 FE,则需部署在所有 FE 节点相同的目录下。否则,需要在所有 FE 和 BE 节点部署。
|
||||
|
||||
注意:需保证部署路径在整个插件生命周期内有效。
|
||||
|
||||
|
||||
@ -337,6 +337,10 @@ public class Util {
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
// get response body as a string from the given url.
|
||||
// "encodedAuthInfo", the base64 encoded auth info. like:
|
||||
// Base64.encodeBase64String("user:passwd".getBytes());
|
||||
// If no auth info, pass a null.
|
||||
public static String getResultForUrl(String urlStr, String encodedAuthInfo, int connectTimeoutMs,
|
||||
int readTimeoutMs) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
@ -457,5 +461,21 @@ public class Util {
|
||||
return i + ORDINAL_SUFFIX[i % 10];
|
||||
}
|
||||
}
|
||||
|
||||
// get an input stream from url, the caller is responsible for closing the stream
|
||||
// "encodedAuthInfo", the base64 encoded auth info. like:
|
||||
// Base64.encodeBase64String("user:passwd".getBytes());
|
||||
// If no auth info, pass a null.
|
||||
public static InputStream getInputStreamFromUrl(String urlStr, String encodedAuthInfo, int connectTimeoutMs,
|
||||
int readTimeoutMs) throws IOException {
|
||||
URL url = new URL(urlStr);
|
||||
URLConnection conn = url.openConnection();
|
||||
if (encodedAuthInfo != null) {
|
||||
conn.setRequestProperty("Authorization", "Basic " + encodedAuthInfo);
|
||||
}
|
||||
conn.setConnectTimeout(connectTimeoutMs);
|
||||
conn.setReadTimeout(readTimeoutMs);
|
||||
return conn.getInputStream();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.plugin;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.Util;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -34,7 +35,6 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URL;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@ -53,6 +53,9 @@ class PluginZip {
|
||||
|
||||
private static final List<String> DEFAULT_PROTOCOL = ImmutableList.of("https://", "http://");
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
private static final int HTTP_TIMEOUT_MS = 10000;
|
||||
|
||||
private String source;
|
||||
|
||||
private List<Path> cleanPathList;
|
||||
@ -113,13 +116,13 @@ class PluginZip {
|
||||
cleanPathList.add(zip);
|
||||
|
||||
// download zip
|
||||
try (InputStream in = openUrlInputStream(source)) {
|
||||
try (InputStream in = getInputStreamFromUrl(source)) {
|
||||
Files.copy(in, zip, StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
// .md5 check
|
||||
String expectedChecksum;
|
||||
try (InputStream in = openUrlInputStream(source + ".md5")) {
|
||||
try (InputStream in = getInputStreamFromUrl(source + ".md5")) {
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(in));
|
||||
expectedChecksum = br.readLine();
|
||||
} catch (IOException e) {
|
||||
@ -137,11 +140,6 @@ class PluginZip {
|
||||
return zip;
|
||||
}
|
||||
|
||||
InputStream openUrlInputStream(String url) throws IOException {
|
||||
URL u = new URL(url);
|
||||
return u.openConnection().getInputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* if `zipOrPath` is a zip file, unzip the specified .zip file to the targetPath.
|
||||
* if `zipOrPath` is a dir, copy the dir and its content to targetPath.
|
||||
@ -195,4 +193,7 @@ class PluginZip {
|
||||
return targetPath;
|
||||
}
|
||||
|
||||
InputStream getInputStreamFromUrl(String url) throws IOException {
|
||||
return Util.getInputStreamFromUrl(url, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS);
|
||||
}
|
||||
}
|
||||
@ -21,17 +21,18 @@ import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import mockit.Expectations;
|
||||
|
||||
public class PluginZipTest {
|
||||
@ -56,10 +57,10 @@ public class PluginZipTest {
|
||||
// normal
|
||||
new Expectations(zip) {
|
||||
{
|
||||
zip.openUrlInputStream("source/test.zip");
|
||||
zip.getInputStreamFromUrl("source/test.zip");
|
||||
result = PluginTestUtil.openTestFile("source/test.zip");
|
||||
|
||||
zip.openUrlInputStream("source/test.zip.md5");
|
||||
zip.getInputStreamFromUrl("source/test.zip.md5");
|
||||
result = new ByteArrayInputStream(new String("7529db41471ec72e165f96fe9fb92742").getBytes());
|
||||
}
|
||||
};
|
||||
@ -76,19 +77,19 @@ public class PluginZipTest {
|
||||
|
||||
@Test
|
||||
public void testDownloadAndValidateZipMd5Error() {
|
||||
PluginZip util = new PluginZip("source/test.zip");
|
||||
PluginZip zip = new PluginZip("source/test.zip");
|
||||
try {
|
||||
new Expectations(util) {
|
||||
new Expectations(zip) {
|
||||
{
|
||||
util.openUrlInputStream("source/test.zip");
|
||||
zip.getInputStreamFromUrl("source/test.zip");
|
||||
result = PluginTestUtil.openTestFile("source/test.zip");
|
||||
|
||||
util.openUrlInputStream("source/test.zip.md5");
|
||||
zip.getInputStreamFromUrl("source/test.zip.md5");
|
||||
result = new ByteArrayInputStream(new String("asdfas").getBytes());
|
||||
}
|
||||
};
|
||||
|
||||
Path zipPath = util.downloadRemoteZip(PluginTestUtil.getTestPath("target"));
|
||||
Path zipPath = zip.downloadRemoteZip(PluginTestUtil.getTestPath("target"));
|
||||
assertFalse(Files.exists(zipPath));
|
||||
} catch (Exception e) {
|
||||
assertTrue(e instanceof UserException);
|
||||
|
||||
Reference in New Issue
Block a user