diff --git a/fe/pom.xml b/fe/pom.xml index 5c82f6fafb..d1731d3e86 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -578,6 +578,10 @@ under the License. maven-surefire-plugin 2.22.2 + set larger, eg, 3, to reduce the time or running FE unit tests<--> + 1 + not reuse forked jvm, so that each unit test will run in separate jvm. to avoid singleton confict<--> + false -javaagent:${settings.localRepository}/org/jmockit/jmockit/1.48/jmockit-1.48.jar diff --git a/fe/src/main/java/org/apache/doris/PaloFe.java b/fe/src/main/java/org/apache/doris/PaloFe.java index 81217f9c12..81c2fb4b0f 100644 --- a/fe/src/main/java/org/apache/doris/PaloFe.java +++ b/fe/src/main/java/org/apache/doris/PaloFe.java @@ -51,25 +51,36 @@ import java.nio.channels.OverlappingFileLockException; public class PaloFe { private static final Logger LOG = LogManager.getLogger(PaloFe.class); - // entrance for palo frontend + public static final String DORIS_HOME_DIR = System.getenv("DORIS_HOME"); + public static final String PID_DIR = System.getenv("PID_DIR"); + public static void main(String[] args) { + start(DORIS_HOME_DIR, PID_DIR, args); + } + + // entrance for doris frontend + public static void start(String dorisHomeDir, String pidDir, String[] args) { + if (Strings.isNullOrEmpty(dorisHomeDir)) { + System.err.println("env DORIS_HOME is not set."); + return; + } + + if (Strings.isNullOrEmpty(pidDir)) { + System.err.println("env PID_DIR is not set."); + return; + } + CommandLineOptions cmdLineOpts = parseArgs(args); System.out.println(cmdLineOpts.toString()); try { - final String paloHome = System.getenv("DORIS_HOME"); - if (Strings.isNullOrEmpty(paloHome)) { - System.out.println("env DORIS_HOME is not set."); - return; - } - // pid file - if (!createAndLockPidFile(System.getenv("PID_DIR") + "/fe.pid")) { + if (!createAndLockPidFile(pidDir + "/fe.pid")) { throw new IOException("pid file is already locked."); } // init config - new Config().init(paloHome + "/conf/fe.conf"); + new Config().init(dorisHomeDir + "/conf/fe.conf"); Log4jConfig.initLogging(); // set dns cache ttl @@ -103,11 +114,11 @@ public class PaloFe { while (true) { Thread.sleep(2000); } - } catch (Throwable exception) { - exception.printStackTrace(); + } catch (Throwable e) { + e.printStackTrace(); System.exit(-1); } - } // end PaloFe main() + } /* * -v --version @@ -228,7 +239,7 @@ public class PaloFe { System.out.println("Build hash: " + Version.PALO_BUILD_HASH); System.exit(0); } else if (cmdLineOpts.runBdbTools()) { - BDBTool bdbTool = new BDBTool(Catalog.BDB_DIR, cmdLineOpts.getBdbToolOpts()); + BDBTool bdbTool = new BDBTool(Catalog.getCurrentCatalog().getBdbDir(), cmdLineOpts.getBdbToolOpts()); if (bdbTool.run()) { System.exit(0); } else { diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index b9cf8953b1..288c189923 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -32,6 +32,7 @@ import org.apache.doris.analysis.AlterDatabaseQuotaStmt; import org.apache.doris.analysis.AlterDatabaseRename; import org.apache.doris.analysis.AlterSystemStmt; import org.apache.doris.analysis.AlterTableStmt; +import org.apache.doris.analysis.AlterViewStmt; import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; @@ -51,13 +52,11 @@ import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropFunctionStmt; import org.apache.doris.analysis.DropPartitionClause; import org.apache.doris.analysis.DropTableStmt; -import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.HashDistributionDesc; import org.apache.doris.analysis.KeysDesc; import org.apache.doris.analysis.LinkDbStmt; import org.apache.doris.analysis.MigrateDbStmt; -import org.apache.doris.analysis.AlterViewStmt; import org.apache.doris.analysis.ModifyPartitionClause; import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.PartitionRenameClause; @@ -69,9 +68,10 @@ import org.apache.doris.analysis.RestoreStmt; import org.apache.doris.analysis.RollupRenameClause; import org.apache.doris.analysis.ShowAlterStmt.AlterType; import org.apache.doris.analysis.SingleRangePartitionDesc; -import org.apache.doris.analysis.TableRenameClause; -import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.analysis.TableRenameClause; +import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.UserDesc; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.backup.BackupHandler; @@ -157,9 +157,9 @@ import org.apache.doris.persist.DatabaseInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo; import org.apache.doris.persist.DropPartitionInfo; -import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyPartitionInfo; +import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.PartitionPersistInfo; import org.apache.doris.persist.RecoverInfo; import org.apache.doris.persist.ReplicaPersistInfo; @@ -250,8 +250,12 @@ public class Catalog { private static final int HTTP_TIMEOUT_SECOND = 5; private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100; private static final int REPLAY_INTERVAL_MS = 1; - public static final String BDB_DIR = Config.meta_dir + "/bdb"; - public static final String IMAGE_DIR = Config.meta_dir + "/image"; + private static final String BDB_DIR = "/bdb"; + private static final String IMAGE_DIR = "/image"; + + private String metaDir; + private String bdbDir; + private String imageDir; private MetaContext metaContext; private long epoch = 0; @@ -312,7 +316,6 @@ public class Catalog { private CatalogIdGenerator idGenerator = new CatalogIdGenerator(NEXT_ID_INIT_VALUE); - private String metaDir; private EditLog editLog; private int clusterId; private String token; @@ -505,6 +508,10 @@ public class Catalog { this.dynamicPartitionScheduler = new DynamicPartitionScheduler("DynamicPartitionScheduler", Config.dynamic_partition_check_interval_seconds * 1000L); + + this.metaDir = Config.meta_dir; + this.bdbDir = this.metaDir + BDB_DIR; + this.imageDir = this.metaDir + IMAGE_DIR; } public static void destroyCheckpoint() { @@ -635,7 +642,23 @@ public class Catalog { } } + public String getBdbDir() { + return bdbDir; + } + + public String getImageDir() { + return imageDir; + } + public void initialize(String[] args) throws Exception { + // set meta dir first. + // we already set these variables in constructor. but Catalog is a singleton class. + // so they may be set before Config is initialized. + // set them here again to make sure these variables use values in fe.conf. + this.metaDir = Config.meta_dir; + this.bdbDir = this.metaDir + BDB_DIR; + this.imageDir = this.metaDir + IMAGE_DIR; + // 0. get local node and helper node info getSelfHostPort(); getHelperNodes(args); @@ -648,12 +671,12 @@ public class Catalog { } if (Config.edit_log_type.equalsIgnoreCase("bdb")) { - File bdbDir = new File(BDB_DIR); + File bdbDir = new File(this.bdbDir); if (!bdbDir.exists()) { bdbDir.mkdirs(); } - File imageDir = new File(IMAGE_DIR); + File imageDir = new File(this.imageDir); if (!imageDir.exists()) { imageDir.mkdirs(); } @@ -667,7 +690,7 @@ public class Catalog { // 3. Load image first and replay edits this.editLog = new EditLog(nodeName); - loadImage(IMAGE_DIR); // load image file + loadImage(this.imageDir); // load image file editLog.open(); // open bdb env this.globalTransactionMgr.setEditLog(editLog); this.idGenerator.setEditLog(editLog); @@ -684,19 +707,15 @@ public class Catalog { } // wait until FE is ready. - public void waitForReady() { + public void waitForReady() throws InterruptedException { while (true) { if (isReady()) { LOG.info("catalog is ready. FE type: {}", feType); break; } - try { - Thread.sleep(2000); - LOG.info("wait catalog to be ready. FE type: {}. is ready: {}", feType, isReady.get()); - } catch (InterruptedException e) { - e.printStackTrace(); - } + Thread.sleep(2000); + LOG.info("wait catalog to be ready. FE type: {}. is ready: {}", feType, isReady.get()); } } @@ -705,8 +724,8 @@ public class Catalog { } private void getClusterIdAndRole() throws IOException { - File roleFile = new File(IMAGE_DIR, Storage.ROLE_FILE); - File versionFile = new File(IMAGE_DIR, Storage.VERSION_FILE); + File roleFile = new File(this.imageDir, Storage.ROLE_FILE); + File versionFile = new File(this.imageDir, Storage.VERSION_FILE); // if helper node is point to self, or there is ROLE and VERSION file in local. // get the node type from local @@ -732,7 +751,7 @@ public class Catalog { // FOLLOWER, which may cause UNDEFINED behavior. // Everything may be OK if the origin role is exactly FOLLOWER, // but if not, FE process will exit somehow. - Storage storage = new Storage(IMAGE_DIR); + Storage storage = new Storage(this.imageDir); if (!roleFile.exists()) { // The very first time to start the first node of the cluster. // It should became a Master node (Master node's role is also FOLLOWER, which means electable) @@ -769,7 +788,7 @@ public class Catalog { clusterId = Config.cluster_id == -1 ? Storage.newClusterID() : Config.cluster_id; token = Strings.isNullOrEmpty(Config.auth_token) ? Storage.newToken() : Config.auth_token; - storage = new Storage(clusterId, token, IMAGE_DIR); + storage = new Storage(clusterId, token, this.imageDir); storage.writeClusterIdAndToken(); isFirstTimeStartUp = true; @@ -820,7 +839,7 @@ public class Catalog { Pair rightHelperNode = helperNodes.get(0); - Storage storage = new Storage(IMAGE_DIR); + Storage storage = new Storage(this.imageDir); if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName())) || !roleFile.exists()) { storage.writeFrontendRoleAndNodeName(role, nodeName); @@ -834,7 +853,7 @@ public class Catalog { // NOTE: cluster_id will be init when Storage object is constructed, // so we new one. - storage = new Storage(IMAGE_DIR); + storage = new Storage(this.imageDir); clusterId = storage.getClusterID(); token = storage.getToken(); if (Strings.isNullOrEmpty(token)) { @@ -1015,8 +1034,8 @@ public class Catalog { Preconditions.checkNotNull(deployManager); // 1. check if this is the first time to start up - File roleFile = new File(IMAGE_DIR, Storage.ROLE_FILE); - File versionFile = new File(IMAGE_DIR, Storage.VERSION_FILE); + File roleFile = new File(this.imageDir, Storage.ROLE_FILE); + File versionFile = new File(this.imageDir, Storage.VERSION_FILE); if ((roleFile.exists() && !versionFile.exists()) || (!roleFile.exists() && versionFile.exists())) { LOG.error("role file and version file must both exist or both not exist. " @@ -1267,7 +1286,7 @@ public class Catalog { private boolean getVersionFileFromHelper(Pair helperNode) throws IOException { try { String url = "http://" + helperNode.first + ":" + Config.http_port + "/version"; - File dir = new File(IMAGE_DIR); + File dir = new File(this.imageDir); MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(Storage.VERSION_FILE, dir)); MetaHelper.complete(Storage.VERSION_FILE, dir); @@ -1281,7 +1300,7 @@ public class Catalog { private void getNewImage(Pair helperNode) throws IOException { long localImageVersion = 0; - Storage storage = new Storage(IMAGE_DIR); + Storage storage = new Storage(this.imageDir); localImageVersion = storage.getImageSeq(); try { @@ -1292,7 +1311,7 @@ public class Catalog { String url = "http://" + helperNode.first + ":" + Config.http_port + "/image?version=" + version; String filename = Storage.IMAGE + "." + version; - File dir = new File(IMAGE_DIR); + File dir = new File(this.imageDir); MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(filename, dir)); MetaHelper.complete(filename, dir); } @@ -1762,9 +1781,9 @@ public class Catalog { // Only called by checkpoint thread public void saveImage() throws IOException { // Write image.ckpt - Storage storage = new Storage(IMAGE_DIR); + Storage storage = new Storage(this.imageDir); File curFile = storage.getImageFile(replayedJournalId.get()); - File ckpt = new File(IMAGE_DIR, Storage.IMAGE_NEW); + File ckpt = new File(this.imageDir, Storage.IMAGE_NEW); saveImage(ckpt, replayedJournalId.get()); // Move image.ckpt to image.dataVersion @@ -4674,10 +4693,6 @@ public class Catalog { return this.canRead.get(); } - public void setMetaDir(String metaDir) { - this.metaDir = metaDir; - } - public boolean isElectable() { return this.isElectable; } diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 4383c82db2..282a04dd87 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -17,6 +17,8 @@ package org.apache.doris.common; +import org.apache.doris.PaloFe; + public class Config extends ConfigBase { /* @@ -55,7 +57,8 @@ public class Config extends ConfigBase { * 60m 60 mins * 120s 120 seconds */ - @ConfField public static String sys_log_dir = System.getenv("DORIS_HOME") + "/log"; + @ConfField + public static String sys_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; @ConfField public static String sys_log_level = "INFO"; @ConfField public static int sys_log_roll_num = 10; @ConfField public static String[] sys_log_verbose_modules = {}; @@ -90,7 +93,7 @@ public class Config extends ConfigBase { * 60m 60 mins * 120s 120 seconds */ - @ConfField public static String audit_log_dir = System.getenv("DORIS_HOME") + "/log"; + @ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; @ConfField public static int audit_log_roll_num = 90; @ConfField public static String[] audit_log_modules = {"slow_query", "query"}; @ConfField(mutable = true) public static long qe_slow_log_ms = 5000; @@ -130,13 +133,13 @@ public class Config extends ConfigBase { * 1. High write performance (SSD) * 2. Safe (RAID) */ - @ConfField public static String meta_dir = System.getenv("DORIS_HOME") + "/palo-meta"; + @ConfField public static String meta_dir = PaloFe.DORIS_HOME_DIR + "/palo-meta"; /* * temp dir is used to save intermediate results of some process, such as backup and restore process. * file in this dir will be cleaned after these process is finished. */ - @ConfField public static String tmp_dir = System.getenv("DORIS_HOME") + "/temp_dir"; + @ConfField public static String tmp_dir = PaloFe.DORIS_HOME_DIR + "/temp_dir"; /* * Edit log type. @@ -920,7 +923,7 @@ public class Config extends ConfigBase { /* * Save small files */ - @ConfField public static String small_file_dir = System.getenv("DORIS_HOME") + "/small_files"; + @ConfField public static String small_file_dir = PaloFe.DORIS_HOME_DIR + "/small_files"; /* * The following 2 configs can set to true to disable the automatic colocate tables's relocate and balance. diff --git a/fe/src/main/java/org/apache/doris/common/util/PrintableMap.java b/fe/src/main/java/org/apache/doris/common/util/PrintableMap.java index 615211de5a..8c14b49882 100644 --- a/fe/src/main/java/org/apache/doris/common/util/PrintableMap.java +++ b/fe/src/main/java/org/apache/doris/common/util/PrintableMap.java @@ -29,6 +29,7 @@ public class PrintableMap { private boolean withQuotation; private boolean wrap; private boolean hidePassword; + private String entryDelimiter = ","; public static final Set SENSITIVE_KEY; static { @@ -37,14 +38,20 @@ public class PrintableMap { SENSITIVE_KEY.add("kerberos_keytab_content"); SENSITIVE_KEY.add("bos_secret_accesskey"); } - + public PrintableMap(Map map, String keyValueSaperator, - boolean withQuotation, boolean wrap) { + boolean withQuotation, boolean wrap, String entryDelimiter) { this.map = map; this.keyValueSaperator = keyValueSaperator; this.withQuotation = withQuotation; this.wrap = wrap; this.hidePassword = false; + this.entryDelimiter = entryDelimiter; + } + + public PrintableMap(Map map, String keyValueSaperator, + boolean withQuotation, boolean wrap) { + this(map, keyValueSaperator, withQuotation, wrap, ","); } public PrintableMap(Map map, String keyValueSaperator, @@ -79,10 +86,11 @@ public class PrintableMap { sb.append("\""); } if (iter.hasNext()) { + sb.append(entryDelimiter); if (wrap) { - sb.append(",\n"); + sb.append("\n"); } else { - sb.append(", "); + sb.append(" "); } } } diff --git a/fe/src/main/java/org/apache/doris/http/action/StaticResourceAction.java b/fe/src/main/java/org/apache/doris/http/action/StaticResourceAction.java index 674c0d2d5f..89b668e433 100644 --- a/fe/src/main/java/org/apache/doris/http/action/StaticResourceAction.java +++ b/fe/src/main/java/org/apache/doris/http/action/StaticResourceAction.java @@ -17,6 +17,7 @@ package org.apache.doris.http.action; +import org.apache.doris.PaloFe; import org.apache.doris.http.ActionController; import org.apache.doris.http.BaseRequest; import org.apache.doris.http.BaseResponse; @@ -116,7 +117,7 @@ public class StaticResourceAction extends WebBaseAction { } public static void registerAction(ActionController controller) throws IllegalArgException { - String httpDir = System.getenv("DORIS_HOME") + "/webroot"; + String httpDir = PaloFe.DORIS_HOME_DIR + "/webroot"; StaticResourceAction action = new StaticResourceAction(controller, httpDir + "/static"); controller.registerHandler(HttpMethod.GET, "/static/js", action); controller.registerHandler(HttpMethod.GET, "/static/css", action); diff --git a/fe/src/main/java/org/apache/doris/http/meta/MetaService.java b/fe/src/main/java/org/apache/doris/http/meta/MetaService.java index 15ea6e45fd..3027424d33 100644 --- a/fe/src/main/java/org/apache/doris/http/meta/MetaService.java +++ b/fe/src/main/java/org/apache/doris/http/meta/MetaService.java @@ -179,7 +179,7 @@ public class MetaService { + "/image?version=" + versionStr; String filename = Storage.IMAGE + "." + versionStr; - File dir = new File(Catalog.IMAGE_DIR); + File dir = new File(Catalog.getCurrentCatalog().getImageDir()); try { OutputStream out = MetaHelper.getOutputStream(filename, dir); MetaHelper.getRemoteFile(url, TIMEOUT_SECOND * 1000, out); diff --git a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index cd48d45e17..6ef3d980ac 100644 --- a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -81,7 +81,7 @@ public class BDBJEJournal implements Journal { * node name is ip_port (the port is edit_log_port) */ private void initBDBEnv(String nodeName) { - environmentPath = Catalog.BDB_DIR; + environmentPath = Catalog.getCurrentCatalog().getBdbDir(); try { Pair selfNode = Catalog.getInstance().getSelfNode(); if (isPortUsing(selfNode.first, selfNode.second)) { diff --git a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java index 23e33ff09e..aa8b616087 100644 --- a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java +++ b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java @@ -65,7 +65,7 @@ public class BDBTool { env = new Environment(new File(metaPath), envConfig); } catch (DatabaseException e) { e.printStackTrace(); - System.err.println("Failed to open BDBJE env: " + Catalog.BDB_DIR + ". exit"); + System.err.println("Failed to open BDBJE env: " + Catalog.getCurrentCatalog().getBdbDir() + ". exit"); return false; } Preconditions.checkNotNull(env); diff --git a/fe/src/main/java/org/apache/doris/load/DppScheduler.java b/fe/src/main/java/org/apache/doris/load/DppScheduler.java index 4aeb170cf1..531486e811 100644 --- a/fe/src/main/java/org/apache/doris/load/DppScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/DppScheduler.java @@ -17,6 +17,7 @@ package org.apache.doris.load; +import org.apache.doris.PaloFe; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LoadException; @@ -53,12 +54,11 @@ import java.util.concurrent.ConcurrentMap; public class DppScheduler { private static final Logger LOG = LogManager.getLogger(DppScheduler.class); - private static final String DORIS_HOME = System.getenv("DORIS_HOME"); - private static final String HADOOP_CLIENT = DORIS_HOME + Config.dpp_hadoop_client_path; + private static final String HADOOP_CLIENT = PaloFe.DORIS_HOME_DIR + Config.dpp_hadoop_client_path; private static final String DPP_OUTPUT_DIR = "export"; - private static final String JOB_CONFIG_DIR = DORIS_HOME + "/temp/job_conf"; + private static final String JOB_CONFIG_DIR = PaloFe.DORIS_HOME_DIR + "/temp/job_conf"; private static final String JOB_CONFIG_FILE = "jobconfig.json"; - private static final String LOCAL_DPP_DIR = DORIS_HOME + "/lib/dpp/" + FeConstants.dpp_version; + private static final String LOCAL_DPP_DIR = PaloFe.DORIS_HOME_DIR + "/lib/dpp/" + FeConstants.dpp_version; private static final int DEFAULT_REDUCE_NUM = 1000; private static final long GB = 1024 * 1024 * 1024L; diff --git a/fe/src/main/java/org/apache/doris/master/Checkpoint.java b/fe/src/main/java/org/apache/doris/master/Checkpoint.java index 9c741ef8ac..1fa486c40e 100644 --- a/fe/src/main/java/org/apache/doris/master/Checkpoint.java +++ b/fe/src/main/java/org/apache/doris/master/Checkpoint.java @@ -55,7 +55,7 @@ public class Checkpoint extends MasterDaemon { public Checkpoint(EditLog editLog) { super("leaderCheckpointer", FeConstants.checkpoint_interval_second * 1000L); - this.imageDir = Catalog.IMAGE_DIR; + this.imageDir = Catalog.getCurrentCatalog().getImageDir(); this.editLog = editLog; } diff --git a/fe/src/main/java/org/apache/doris/master/MetaHelper.java b/fe/src/main/java/org/apache/doris/master/MetaHelper.java index 4ae630e333..5071b1f47a 100644 --- a/fe/src/main/java/org/apache/doris/master/MetaHelper.java +++ b/fe/src/main/java/org/apache/doris/master/MetaHelper.java @@ -36,7 +36,7 @@ public class MetaHelper { private static final int CHECKPOINT_LIMIT_BYTES = 30 * 1024 * 1024; public static File getMasterImageDir() { - String metaDir = Catalog.IMAGE_DIR; + String metaDir = Catalog.getCurrentCatalog().getImageDir(); return new File(metaDir); } diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index b42da95fe2..d798278f0d 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -492,8 +492,8 @@ public class Coordinator { errMsg = "exec rpc error. backend id: " + pair.first.backend.getId(); } queryStatus.setStatus(errMsg); - LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}", - errMsg, fragment.getFragmentId(), + LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}", + errMsg, code, fragment.getFragmentId(), pair.first.address.hostname, pair.first.address.port); cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { diff --git a/fe/src/main/java/org/apache/doris/rpc/AttachmentRequest.java b/fe/src/main/java/org/apache/doris/rpc/AttachmentRequest.java index 166a9c93ec..568ba29527 100644 --- a/fe/src/main/java/org/apache/doris/rpc/AttachmentRequest.java +++ b/fe/src/main/java/org/apache/doris/rpc/AttachmentRequest.java @@ -22,7 +22,7 @@ import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -// used to compatible with older our thrift protocol +// used to compatible with our older thrift protocol public class AttachmentRequest { protected byte[] serializedRequest; protected byte[] serializedResult; diff --git a/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java b/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java new file mode 100644 index 0000000000..61a65679b4 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.Planner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl; +import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl; +import org.apache.doris.utframe.MockedBackendFactory.DefaultPBackendServiceImpl; +import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException; +import org.apache.doris.utframe.MockedFrontend.FeStartException; +import org.apache.doris.utframe.MockedFrontend.NotInitException; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +/* + * This demo is mainly used to confirm that + * repeatedly starting FE and BE in 2 UnitTest will not cause conflict + */ +public class AnotherDemoTest { + + private static int fe_http_port; + private static int fe_rpc_port; + private static int fe_query_port; + private static int fe_edit_log_port; + + private static int be_heartbeat_port; + private static int be_thrift_port; + private static int be_brpc_port; + private static int be_http_port; + + // use a unique dir so that it won't be conflict with other unit test which + // may also start a Mocked Frontend + private static String runningDir = "fe/mocked/AnotherDemoTest/" + UUID.randomUUID().toString() + "/"; + + @BeforeClass + public static void beforeClass() throws EnvVarNotSetException, IOException, + FeStartException, NotInitException, DdlException, InterruptedException { + // get DORIS_HOME + final String dorisHome = System.getenv("DORIS_HOME"); + if (Strings.isNullOrEmpty(dorisHome)) { + throw new EnvVarNotSetException("env DORIS_HOME is not set"); + } + + getRandomPort(); + + // start fe in "DORIS_HOME/fe/mocked/" + MockedFrontend frontend = MockedFrontend.getInstance(); + Map feConfMap = Maps.newHashMap(); + // set additional fe config + feConfMap.put("http_port", String.valueOf(fe_http_port)); + feConfMap.put("rpc_port", String.valueOf(fe_rpc_port)); + feConfMap.put("query_port", String.valueOf(fe_query_port)); + feConfMap.put("edit_log_port", String.valueOf(fe_edit_log_port)); + feConfMap.put("tablet_create_timeout_second", "10"); + frontend.init(dorisHome + "/" + runningDir, feConfMap); + frontend.start(new String[0]); + + // start be + MockedBackend backend = MockedBackendFactory.createBackend("127.0.0.1", + be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port, + new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port), + new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + backend.setFeAddress(new TNetworkAddress("127.0.0.1", frontend.getRpcPort())); + backend.start(); + + // add be + List> bes = Lists.newArrayList(); + bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort())); + Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster"); + + // sleep to wait first heartbeat + Thread.sleep(5000); + } + + // generate all port from between 20000 ~ 30000 + private static void getRandomPort() { + Random r = new Random(System.currentTimeMillis()); + int basePort = 20000 + r.nextInt(9000); + fe_http_port = basePort + 1; + fe_rpc_port = basePort + 2; + fe_query_port = basePort + 3; + fe_edit_log_port = basePort + 4; + + be_heartbeat_port = basePort + 5; + be_thrift_port = basePort + 6; + be_brpc_port = basePort + 7; + be_http_port = basePort + 8; + } + + @Test + public void testCreateDbAndTable() throws Exception { + // 1. create connect context + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + // 2. create database db1 + String createDbStmtStr = "create database db1;"; + CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); + Catalog.getCurrentCatalog().createDb(createDbStmt); + System.out.println(Catalog.getCurrentCatalog().getDbNames()); + // 3. create table tbl1 + String createTblStmtStr = "create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); + Catalog.getCurrentCatalog().createTable(createTableStmt); + // 4. get and test the created db and table + Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1"); + Assert.assertNotNull(db); + db.readLock(); + try { + OlapTable tbl = (OlapTable) db.getTable("tbl1"); + Assert.assertNotNull(tbl); + System.out.println(tbl.getName()); + Assert.assertEquals("Doris", tbl.getEngine()); + Assert.assertEquals(1, tbl.getBaseSchema().size()); + } finally { + db.readUnlock(); + } + // 5. query + // TODO: we can not process real query for now. So it has to be a explain query + String queryStr = "explain select * from db1.tbl1"; + StmtExecutor stmtExecutor = new StmtExecutor(ctx, queryStr); + stmtExecutor.execute(); + Planner planner = stmtExecutor.planner(); + List fragments = planner.getFragments(); + Assert.assertEquals(1, fragments.size()); + PlanFragment fragment = fragments.get(0); + Assert.assertTrue(fragment.getPlanRoot() instanceof OlapScanNode); + Assert.assertEquals(0, fragment.getChildren().size()); + } +} diff --git a/fe/src/test/java/org/apache/doris/utframe/DemoTest.java b/fe/src/test/java/org/apache/doris/utframe/DemoTest.java new file mode 100644 index 0000000000..a5498be1d9 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/DemoTest.java @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.analysis.AlterTableStmt; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.Pair; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.Planner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl; +import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl; +import org.apache.doris.utframe.MockedBackendFactory.DefaultPBackendServiceImpl; +import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException; +import org.apache.doris.utframe.MockedFrontend.FeStartException; +import org.apache.doris.utframe.MockedFrontend.NotInitException; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +/* + * This demo shows how to run unit test with mocked FE and BE. + * It will + * 1. start a mocked FE and a mocked BE. + * 2. Create a database and a tbl. + * 3. Make a schema change to tbl. + * 4. send a query and get query plan + */ +public class DemoTest { + + private static int fe_http_port; + private static int fe_rpc_port; + private static int fe_query_port; + private static int fe_edit_log_port; + + private static int be_heartbeat_port; + private static int be_thrift_port; + private static int be_brpc_port; + private static int be_http_port; + // use a unique dir so that it won't be conflict with other unit test which + // may also start a Mocked Frontend + private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; + + @BeforeClass + public static void beforeClass() throws EnvVarNotSetException, IOException, + FeStartException, NotInitException, DdlException, InterruptedException { + // get DORIS_HOME + final String dorisHome = System.getenv("DORIS_HOME"); + if (Strings.isNullOrEmpty(dorisHome)) { + throw new EnvVarNotSetException("env DORIS_HOME is not set"); + } + + getRandomPort(); + + // start fe in "DORIS_HOME/fe/mocked/" + MockedFrontend frontend = MockedFrontend.getInstance(); + Map feConfMap = Maps.newHashMap(); + // set additional fe config + feConfMap.put("http_port", String.valueOf(fe_http_port)); + feConfMap.put("rpc_port", String.valueOf(fe_rpc_port)); + feConfMap.put("query_port", String.valueOf(fe_query_port)); + feConfMap.put("edit_log_port", String.valueOf(fe_edit_log_port)); + feConfMap.put("tablet_create_timeout_second", "10"); + frontend.init(dorisHome + "/" + runningDir, feConfMap); + frontend.start(new String[0]); + + // start be + MockedBackend backend = MockedBackendFactory.createBackend("127.0.0.1", + be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port, + new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port), + new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + backend.setFeAddress(new TNetworkAddress("127.0.0.1", frontend.getRpcPort())); + backend.start(); + + // add be + List> bes = Lists.newArrayList(); + bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort())); + Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster"); + + // sleep to wait first heartbeat + Thread.sleep(6000); + } + + // generate all port from between 20000 ~ 30000 + private static void getRandomPort() { + Random r = new Random(System.currentTimeMillis()); + int basePort = 20000 + r.nextInt(9000); + fe_http_port = basePort + 1; + fe_rpc_port = basePort + 2; + fe_query_port = basePort + 3; + fe_edit_log_port = basePort + 4; + + be_heartbeat_port = basePort + 5; + be_thrift_port = basePort + 6; + be_brpc_port = basePort + 7; + be_http_port = basePort + 8; + } + + @Test + public void testCreateDbAndTable() throws Exception { + // 1. create connect context + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + // 2. create database db1 + String createDbStmtStr = "create database db1;"; + CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); + Catalog.getCurrentCatalog().createDb(createDbStmt); + System.out.println(Catalog.getCurrentCatalog().getDbNames()); + // 3. create table tbl1 + String createTblStmtStr = "create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"; + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); + Catalog.getCurrentCatalog().createTable(createTableStmt); + // 4. get and test the created db and table + Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1"); + Assert.assertNotNull(db); + db.readLock(); + try { + OlapTable tbl = (OlapTable) db.getTable("tbl1"); + Assert.assertNotNull(tbl); + System.out.println(tbl.getName()); + Assert.assertEquals("Doris", tbl.getEngine()); + Assert.assertEquals(1, tbl.getBaseSchema().size()); + } finally { + db.readUnlock(); + } + // 5. process a schema change job + String alterStmtStr = "alter table db1.tbl1 add column k2 int default '1'"; + AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, ctx); + Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt); + // 6. check alter job + Map alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2(); + Assert.assertEquals(1, alterJobs.size()); + for (AlterJobV2 alterJobV2 : alterJobs.values()) { + while (!alterJobV2.getJobState().isFinalState()) { + System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState()); + Thread.sleep(5000); + } + System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState()); + } + db.readLock(); + try { + OlapTable tbl = (OlapTable) db.getTable("tbl1"); + Assert.assertEquals(2, tbl.getBaseSchema().size()); + } finally { + db.readUnlock(); + } + // 7. query + // TODO: we can not process real query for now. So it has to be a explain query + String queryStr = "explain select * from db1.tbl1"; + StmtExecutor stmtExecutor = new StmtExecutor(ctx, queryStr); + stmtExecutor.execute(); + Planner planner = stmtExecutor.planner(); + List fragments = planner.getFragments(); + Assert.assertEquals(1, fragments.size()); + PlanFragment fragment = fragments.get(0); + Assert.assertTrue(fragment.getPlanRoot() instanceof OlapScanNode); + Assert.assertEquals(0, fragment.getChildren().size()); + } +} diff --git a/fe/src/test/java/org/apache/doris/utframe/MockedBackend.java b/fe/src/test/java/org/apache/doris/utframe/MockedBackend.java new file mode 100644 index 0000000000..bd65b6c26d --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/MockedBackend.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.common.ThriftServer; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.HeartbeatService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.utframe.MockedBackendFactory.BeThriftService; + +import com.baidu.jprotobuf.pbrpc.transport.RpcServer; + +import org.apache.thrift.TProcessor; + +import java.io.IOException; + +/* + * Mocked Backend + * A mocked Backend has 3 rpc services. + * HeartbeatService.Iface to handle heart beat from Frontend. + * BeThriftService to handle agent tasks and other requests from Frontend. + * BRpcService to handle the query request from Frontend. + * + * Users can create a BE by customizing three rpc services. + * + * Better to create a mocked Backend from MockedBackendFactory. + * In MockedBackendFactory, there default rpc service for above 3 rpc services. + */ +public class MockedBackend { + + private ThriftServer heartbeatServer; + private ThriftServer beThriftServer; + private RpcServer rpcServer; + + private String host; + private int heartbeatPort; + private int thriftPort; + private int brpcPort; + private int httpPort; + // the fe address: fe host and fe rpc port. + // This must be set explicitly after creating mocked Backend + private TNetworkAddress feAddress; + + public MockedBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort, + HeartbeatService.Iface hbService, + BeThriftService backendService, + Object pBackendService) throws IOException { + + this.host = host; + this.heartbeatPort = heartbeatPort; + this.thriftPort = thriftPort; + this.brpcPort = brpcPort; + this.httpPort = httpPort; + + createHeartbeatService(heartbeatPort, hbService); + createBeThriftService(thriftPort, backendService); + createBrpcService(brpcPort, pBackendService); + + backendService.setBackend(this); + backendService.init(); + } + + public void setFeAddress(TNetworkAddress feAddress) { + this.feAddress = feAddress; + } + + public TNetworkAddress getFeAddress() { + return feAddress; + } + + public String getHost() { + return host; + } + + public int getHeartbeatPort() { + return heartbeatPort; + } + + public int getBeThriftPort() { + return thriftPort; + } + + public int getBrpcPort() { + return brpcPort; + } + + public int getHttpPort() { + return httpPort; + } + + public void start() throws IOException { + heartbeatServer.start(); + System.out.println("Be heartbeat service is started with port: " + heartbeatPort); + beThriftServer.start(); + System.out.println("Be thrift service is started with port: " + thriftPort); + rpcServer.start(brpcPort); + System.out.println("Be brpc service is started with port: " + brpcPort); + } + + private void createHeartbeatService(int heartbeatPort, HeartbeatService.Iface serviceImpl) throws IOException { + TProcessor tprocessor = new HeartbeatService.Processor(serviceImpl); + heartbeatServer = new ThriftServer(heartbeatPort, tprocessor); + } + + private void createBeThriftService(int beThriftPort, BackendService.Iface serviceImpl) throws IOException { + TProcessor tprocessor = new BackendService.Processor(serviceImpl); + beThriftServer = new ThriftServer(beThriftPort, tprocessor); + } + + private void createBrpcService(int brpcPort, Object pBackendServiceImpl) { + rpcServer = new RpcServer(); + rpcServer.registerService(pBackendServiceImpl); + } +} diff --git a/fe/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java new file mode 100644 index 0000000000..1253673416 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.common.ClientPool; +import org.apache.doris.proto.PCancelPlanFragmentRequest; +import org.apache.doris.proto.PCancelPlanFragmentResult; +import org.apache.doris.proto.PExecPlanFragmentResult; +import org.apache.doris.proto.PFetchDataResult; +import org.apache.doris.proto.PProxyRequest; +import org.apache.doris.proto.PProxyResult; +import org.apache.doris.proto.PQueryStatistics; +import org.apache.doris.proto.PStatus; +import org.apache.doris.proto.PTriggerProfileReportResult; +import org.apache.doris.rpc.PExecPlanFragmentRequest; +import org.apache.doris.rpc.PFetchDataRequest; +import org.apache.doris.rpc.PTriggerProfileReportRequest; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.HeartbeatService; +import org.apache.doris.thrift.TAgentPublishRequest; +import org.apache.doris.thrift.TAgentResult; +import org.apache.doris.thrift.TAgentTaskRequest; +import org.apache.doris.thrift.TBackend; +import org.apache.doris.thrift.TBackendInfo; +import org.apache.doris.thrift.TCancelPlanFragmentParams; +import org.apache.doris.thrift.TCancelPlanFragmentResult; +import org.apache.doris.thrift.TDeleteEtlFilesRequest; +import org.apache.doris.thrift.TEtlState; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentResult; +import org.apache.doris.thrift.TExportState; +import org.apache.doris.thrift.TExportStatusResult; +import org.apache.doris.thrift.TExportTaskRequest; +import org.apache.doris.thrift.TFetchDataParams; +import org.apache.doris.thrift.TFetchDataResult; +import org.apache.doris.thrift.TFinishTaskRequest; +import org.apache.doris.thrift.THeartbeatResult; +import org.apache.doris.thrift.TMasterInfo; +import org.apache.doris.thrift.TMiniLoadEtlStatusRequest; +import org.apache.doris.thrift.TMiniLoadEtlStatusResult; +import org.apache.doris.thrift.TMiniLoadEtlTaskRequest; +import org.apache.doris.thrift.TRoutineLoadTask; +import org.apache.doris.thrift.TScanBatchResult; +import org.apache.doris.thrift.TScanCloseParams; +import org.apache.doris.thrift.TScanCloseResult; +import org.apache.doris.thrift.TScanNextBatchParams; +import org.apache.doris.thrift.TScanOpenParams; +import org.apache.doris.thrift.TScanOpenResult; +import org.apache.doris.thrift.TSnapshotRequest; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TTabletStatResult; +import org.apache.doris.thrift.TTransmitDataParams; +import org.apache.doris.thrift.TTransmitDataResult; +import org.apache.doris.thrift.TUniqueId; + +import com.baidu.jprotobuf.pbrpc.ProtobufRPCService; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; + +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +/* + * This class is used to create mock backends. + * Usage can be found in Demon.java's beforeClass() + * + * + */ +public class MockedBackendFactory { + + public static final String BE_DEFAULT_IP = "127.0.0.1"; + public static final int BE_DEFAULT_HEARTBEAT_PORT = 9050; + public static final int BE_DEFAULT_THRIFT_PORT = 9060; + public static final int BE_DEFAULT_BRPC_PORT = 8060; + public static final int BE_DEFAULT_HTTP_PORT = 8040; + + // create a default mocked backend with 3 default rpc services + public static MockedBackend createDefaultBackend() throws IOException { + return createBackend(BE_DEFAULT_IP, BE_DEFAULT_HEARTBEAT_PORT, BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_BRPC_PORT, BE_DEFAULT_HTTP_PORT, + new DefaultHeartbeatServiceImpl(BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_HTTP_PORT, BE_DEFAULT_BRPC_PORT), + new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + } + + // create a mocked backend with customize parameters + public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort, + HeartbeatService.Iface hbService, BeThriftService beThriftService, Object pBackendService) + throws IOException { + MockedBackend backend = new MockedBackend(host, heartbeatPort, thriftPort, brpcPort, httpPort, hbService, + beThriftService, pBackendService); + return backend; + } + + // the default hearbeat service. + // User can implement HeartbeatService.Iface to create other custom heartbeat service. + public static class DefaultHeartbeatServiceImpl implements HeartbeatService.Iface { + private int beThriftPort; + private int beHttpPort; + private int beBrpcPort; + + public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort) { + this.beThriftPort = beThriftPort; + this.beHttpPort = beHttpPort; + this.beBrpcPort = beBrpcPort; + } + + @Override + public THeartbeatResult heartbeat(TMasterInfo master_info) throws TException { + TBackendInfo backendInfo = new TBackendInfo(beThriftPort, beHttpPort); + backendInfo.setBrpc_port(beBrpcPort); + THeartbeatResult result = new THeartbeatResult(new TStatus(TStatusCode.OK), backendInfo); + return result; + } + } + + // abstract BeThriftService. + // User can extends this abstract class to create other custom be thrift service + public static abstract class BeThriftService implements BackendService.Iface { + protected MockedBackend backend; + + public void setBackend(MockedBackend backend) { + this.backend = backend; + } + + public abstract void init(); + } + + // the default be thrift service extends from BeThriftService + public static class DefaultBeThriftServiceImpl extends BeThriftService { + // task queue to save all agent tasks coming from Frontend + private BlockingQueue taskQueue = Queues.newLinkedBlockingQueue(); + private TBackend tBackend; + private long reportVersion = 0; + + public DefaultBeThriftServiceImpl() { + } + + @Override + public void init() { + tBackend = new TBackend(backend.getHost(), backend.getBeThriftPort(), backend.getHttpPort()); + // start a thread to handle all agent tasks in taskQueue. + // Only return information that the task was successfully executed. + new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + TAgentTaskRequest request = taskQueue.take(); + System.out.println("get agent task request. type: " + request.getTask_type() + + ", signature: " + request.getSignature()); + TFinishTaskRequest finishTaskRequest = new TFinishTaskRequest(tBackend, + request.getTask_type(), request.getSignature(), new TStatus(TStatusCode.OK)); + finishTaskRequest.setReport_version(++reportVersion); + + FrontendService.Client client = ClientPool.frontendPool.borrowObject(backend.getFeAddress(), 2000); + System.out.println("get fe " + backend.getFeAddress() + " client: " + client); + client.finishTask(finishTaskRequest); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }).start(); + } + + @Override + public TExecPlanFragmentResult exec_plan_fragment(TExecPlanFragmentParams params) throws TException { + return null; + } + + @Override + public TCancelPlanFragmentResult cancel_plan_fragment(TCancelPlanFragmentParams params) throws TException { + return null; + } + + @Override + public TTransmitDataResult transmit_data(TTransmitDataParams params) throws TException { + return null; + } + + @Override + public TFetchDataResult fetch_data(TFetchDataParams params) throws TException { + return null; + } + + @Override + public TAgentResult submit_tasks(List tasks) throws TException { + for (TAgentTaskRequest request : tasks) { + taskQueue.add(request); + System.out.println("receive agent task request. type: " + request.getTask_type() + ", signature: " + + request.getSignature()); + } + return new TAgentResult(new TStatus(TStatusCode.OK)); + } + + @Override + public TAgentResult make_snapshot(TSnapshotRequest snapshot_request) throws TException { + return new TAgentResult(new TStatus(TStatusCode.OK)); + } + + @Override + public TAgentResult release_snapshot(String snapshot_path) throws TException { + return new TAgentResult(new TStatus(TStatusCode.OK)); + } + + @Override + public TAgentResult publish_cluster_state(TAgentPublishRequest request) throws TException { + return new TAgentResult(new TStatus(TStatusCode.OK)); + } + + @Override + public TAgentResult submit_etl_task(TMiniLoadEtlTaskRequest request) throws TException { + return new TAgentResult(new TStatus(TStatusCode.OK)); + } + + @Override + public TMiniLoadEtlStatusResult get_etl_status(TMiniLoadEtlStatusRequest request) throws TException { + return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED); + } + + @Override + public TAgentResult delete_etl_files(TDeleteEtlFilesRequest request) throws TException { + return new TAgentResult(new TStatus(TStatusCode.OK)); + } + + @Override + public TStatus submit_export_task(TExportTaskRequest request) throws TException { + return new TStatus(TStatusCode.OK); + } + + @Override + public TExportStatusResult get_export_status(TUniqueId task_id) throws TException { + return new TExportStatusResult(new TStatus(TStatusCode.OK), TExportState.FINISHED); + } + + @Override + public TStatus erase_export_task(TUniqueId task_id) throws TException { + return new TStatus(TStatusCode.OK); + } + + @Override + public TTabletStatResult get_tablet_stat() throws TException { + return new TTabletStatResult(Maps.newHashMap()); + } + + @Override + public TStatus submit_routine_load_task(List tasks) throws TException { + return new TStatus(TStatusCode.OK); + } + + @Override + public TScanOpenResult open_scanner(TScanOpenParams params) throws TException { + return null; + } + + @Override + public TScanBatchResult get_next(TScanNextBatchParams params) throws TException { + return null; + } + + @Override + public TScanCloseResult close_scanner(TScanCloseParams params) throws TException { + return null; + } + } + + // The default Brpc service. + // TODO(cmy): Currently this service cannot correctly simulate the processing of query requests. + public static class DefaultPBackendServiceImpl { + @ProtobufRPCService(serviceName = "PBackendService", methodName = "exec_plan_fragment") + public PExecPlanFragmentResult exec_plan_fragment(PExecPlanFragmentRequest request) { + System.out.println("get exec_plan_fragment request"); + PExecPlanFragmentResult result = new PExecPlanFragmentResult(); + PStatus pStatus = new PStatus(); + pStatus.status_code = 0; + result.status = pStatus; + return result; + } + + @ProtobufRPCService(serviceName = "PBackendService", methodName = "cancel_plan_fragment") + public PCancelPlanFragmentResult cancel_plan_fragment(PCancelPlanFragmentRequest request) { + System.out.println("get cancel_plan_fragment request"); + PCancelPlanFragmentResult result = new PCancelPlanFragmentResult(); + PStatus pStatus = new PStatus(); + pStatus.status_code = 0; + result.status = pStatus; + return result; + } + + @ProtobufRPCService(serviceName = "PBackendService", methodName = "fetch_data") + public PFetchDataResult fetchDataAsync(PFetchDataRequest request) { + System.out.println("get fetch_data"); + PFetchDataResult result = new PFetchDataResult(); + PStatus pStatus = new PStatus(); + pStatus.status_code = 0; + + PQueryStatistics pQueryStatistics = new PQueryStatistics(); + pQueryStatistics.scan_rows = 0L; + pQueryStatistics.scan_bytes = 0L; + + result.status = pStatus; + result.packet_seq = 0L; + result.query_statistics = pQueryStatistics; + result.eos = true; + return result; + } + + @ProtobufRPCService(serviceName = "PBackendService", methodName = "trigger_profile_report") + public PTriggerProfileReportResult triggerProfileReport(PTriggerProfileReportRequest request) { + return null; + } + + @ProtobufRPCService(serviceName = "PBackendService", methodName = "get_info") + public PProxyResult getInfo(PProxyRequest request) { + return null; + } + } +} diff --git a/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java new file mode 100644 index 0000000000..0768481001 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.PaloFe; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.util.PrintableMap; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.Map; + +/* + * This class is used to start a Frontend process locally, for unit test. + * This is a singleton class. There can be only one instance of this class globally. + * Usage: + * MockedFrontend mockedFrontend = MockedFrontend.getInstance(); + * mockedFrontend.init(confMap); + * mockedFrontend.start(new String[0]); + * + * ... + * + * confMap is a instance of Map. + * Here you can add any FE configuration you want to add. For example: + * confMap.put("http_port", "8032"); + * + * FrontendProcess already contains a minimal set of FE configurations. + * Any configuration in confMap will form the final fe.conf file with this minimal set. + * + * 1 environment variable must be set: + * DORIS_HOME/ + * + * The running dir is set when calling init(); + * There will be 3 directories under running dir/: + * running dir/conf/ + * running dir/log/ + * running dir/palo-meta/ + * + * All these 3 directories will be cleared first. + * + */ +public class MockedFrontend { + public static final String FE_PROCESS = "fe"; + + // the running dir of this mocked frontend. + // log/ palo-meta/ and conf/ dirs will be created under this dir. + private String runningDir; + // the min set of fe.conf. + private static final Map MIN_FE_CONF; + + private Map finalFeConf; + + static { + MIN_FE_CONF = Maps.newHashMap(); + MIN_FE_CONF.put("sys_log_level", "INFO"); + MIN_FE_CONF.put("http_port", "8030"); + MIN_FE_CONF.put("rpc_port", "9020"); + MIN_FE_CONF.put("query_port", "9030"); + MIN_FE_CONF.put("edit_log_port", "9010"); + MIN_FE_CONF.put("priority_networks", "127.0.0.1/24"); + MIN_FE_CONF.put("sys_log_verbose_modules", "org"); + } + + private static class SingletonHolder { + private static final MockedFrontend INSTANCE = new MockedFrontend(); + } + + public static MockedFrontend getInstance() { + return SingletonHolder.INSTANCE; + } + + public int getRpcPort() { + return Integer.valueOf(finalFeConf.get("rpc_port")); + } + + private boolean isInit = false; + + // init the fe process. This must be called before starting the frontend process. + // 1. check if all neccessary environment variables are set. + // 2. clear and create 3 dirs: runningDir/log/, runningDir/palo-meta/, runningDir/conf/ + // 3. init fe.conf + // The content of "fe.conf" is a merge set of input `feConf` and MIN_FE_CONF + public void init(String runningDir, Map feConf) throws EnvVarNotSetException, IOException { + if (isInit) { + return; + } + + if (Strings.isNullOrEmpty(runningDir)) { + System.err.println("running dir is not set for mocked frontend"); + throw new EnvVarNotSetException("running dir is not set for mocked frontend"); + } + + this.runningDir = runningDir; + System.out.println("mocked frontend running in dir: " + this.runningDir); + + // root running dir + createAndClearDir(this.runningDir); + // clear and create log dir + createAndClearDir(runningDir + "/log/"); + // clear and create meta dir + createAndClearDir(runningDir + "/palo-meta/"); + // clear and create conf dir + createAndClearDir(runningDir + "/conf/"); + // init fe.conf + initFeConf(runningDir + "/conf/", feConf); + + isInit = true; + } + + private void initFeConf(String confDir, Map feConf) throws IOException { + finalFeConf = Maps.newHashMap(MIN_FE_CONF); + // these 2 configs depends on running dir, so set them here. + finalFeConf.put("LOG_DIR", this.runningDir + "/log"); + finalFeConf.put("meta_dir", this.runningDir + "/palo-meta"); + finalFeConf.put("sys_log_dir", this.runningDir + "/log"); + finalFeConf.put("audit_log_dir", this.runningDir + "/log"); + finalFeConf.put("tmp_dir", this.runningDir + "/temp_dir"); + // use custom config to add or override default config + finalFeConf.putAll(feConf); + + PrintableMap map = new PrintableMap<>(finalFeConf, "=", false, true, ""); + File confFile = new File(confDir + "fe.conf"); + if (!confFile.exists()) { + confFile.createNewFile(); + } + PrintWriter printWriter = new PrintWriter(confFile); + try { + printWriter.print(map.toString()); + printWriter.flush(); + } finally { + printWriter.close(); + } + } + + // clear the specified dir, and create a empty one + private void createAndClearDir(String dir) throws IOException { + File localDir = new File(dir); + if (!localDir.exists()) { + localDir.mkdirs(); + } else { + Files.walk(Paths.get(dir)).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + if (!localDir.exists()) { + localDir.mkdirs(); + } + } + } + + public String getRunningDir() { + return runningDir; + } + + private static class FERunnable implements Runnable { + private MockedFrontend frontend; + private String[] args; + + public FERunnable(MockedFrontend frontend, String[] args) { + this.frontend = frontend; + this.args = args; + } + + @Override + public void run() { + PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args); + } + } + + // must call init() before start. + public void start(String[] args) throws FeStartException, NotInitException { + if (!isInit) { + throw new NotInitException("fe process is not initialized"); + } + Thread feThread = new Thread(new FERunnable(this, args), FE_PROCESS); + feThread.start(); + // wait the catalog to be ready until timeout (30 seconds) + waitForCatalogReady(30 * 1000); + System.out.println("Fe process is started"); + } + + private void waitForCatalogReady(long timeoutMs) throws FeStartException { + long left = timeoutMs; + while (!Catalog.getCurrentCatalog().isReady() && left > 0) { + System.out.println("catalog is not ready"); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + left -= 5000; + } + + if (left <= 0 && !Catalog.getCurrentCatalog().isReady()) { + throw new FeStartException("fe start failed"); + } + } + + public static class FeStartException extends Exception { + public FeStartException(String msg) { + super(msg); + } + } + + public static class EnvVarNotSetException extends Exception { + public EnvVarNotSetException(String msg) { + super(msg); + } + } + + public static class NotInitException extends Exception { + public NotInitException(String msg) { + super(msg); + } + } + +} diff --git a/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java new file mode 100644 index 0000000000..d6811a246a --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; + +import java.io.StringReader; + +public class UtFrameUtils { + + // Help to create a mocked ConnectContext. + public static ConnectContext createDefaultCtx() { + ConnectContext ctx = new ConnectContext(); + ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); + ctx.setCurrentUserIdentity(UserIdentity.ROOT); + ctx.setQualifiedUser(PaloAuth.ROOT_USER); + ctx.setCatalog(Catalog.getCurrentCatalog()); + ctx.setThreadLocalInfo(); + return ctx; + } + + // Parse an origin stmt and analyze it. Return a StatementBase instance. + public static StatementBase parseAndAnalyzeStmt(String originStmt, ConnectContext ctx) + throws Exception { + System.out.println("begin to parse stmt: " + originStmt); + SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); + SqlParser parser = new SqlParser(input); + + StatementBase statementBase = (StatementBase) parser.parse().value; + Analyzer analyzer = new Analyzer(ctx.getCatalog(), ctx); + statementBase.analyze(analyzer); + return statementBase; + } +} diff --git a/run-fe-ut.sh b/run-fe-ut.sh index 4d68a87dcd..e3b2705a49 100755 --- a/run-fe-ut.sh +++ b/run-fe-ut.sh @@ -90,7 +90,10 @@ if [ ${COVERAGE} -eq 1 ]; then ant cover-test else if [ ${RUN} -eq 1 ]; then - echo "Run the specified class" + echo "Run the specified class: $1" + # eg: + # sh run-fe-ut.sh --run org.apache.doris.utframe.Demo + # sh run-fe-ut.sh --run org.apache.doris.utframe.Demo#testCreateDbAndTable+test2 $MVN test -D test=$1 else echo "Run Frontend UT"