[UnitTest] Support starting mocked FE and BE process in unit test (#2826)
This CL implements a simulated FE process and a simulated BE service. You can view their specific usage methods at `fe/src/test/java/org/apache/doris/utframe/DemoTest.java` At the same time, I modified the configuration of the maven-surefire-plugin plugin, so that each unit test runs in a separate JVM, which can avoid conflicts caused by various singleton classes in FE. Starting a separate jvm for each unit test will bring about 30% extra time overhead. However, you can control the number of concurrency of unit tests by setting the `forkCount` configuration of the maven-surefire-plugin plugin in `fe/pom.xml`. The default configuration is still 1 for easy viewing of the output log. If set to 3, the entire FE unit test run time is about 4 minutes.
This commit is contained in:
167
fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java
Normal file
167
fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java
Normal file
@ -0,0 +1,167 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.utframe;
|
||||
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.DefaultPBackendServiceImpl;
|
||||
import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
|
||||
import org.apache.doris.utframe.MockedFrontend.FeStartException;
|
||||
import org.apache.doris.utframe.MockedFrontend.NotInitException;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
/*
|
||||
* This demo is mainly used to confirm that
|
||||
* repeatedly starting FE and BE in 2 UnitTest will not cause conflict
|
||||
*/
|
||||
public class AnotherDemoTest {
|
||||
|
||||
private static int fe_http_port;
|
||||
private static int fe_rpc_port;
|
||||
private static int fe_query_port;
|
||||
private static int fe_edit_log_port;
|
||||
|
||||
private static int be_heartbeat_port;
|
||||
private static int be_thrift_port;
|
||||
private static int be_brpc_port;
|
||||
private static int be_http_port;
|
||||
|
||||
// use a unique dir so that it won't be conflict with other unit test which
|
||||
// may also start a Mocked Frontend
|
||||
private static String runningDir = "fe/mocked/AnotherDemoTest/" + UUID.randomUUID().toString() + "/";
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws EnvVarNotSetException, IOException,
|
||||
FeStartException, NotInitException, DdlException, InterruptedException {
|
||||
// get DORIS_HOME
|
||||
final String dorisHome = System.getenv("DORIS_HOME");
|
||||
if (Strings.isNullOrEmpty(dorisHome)) {
|
||||
throw new EnvVarNotSetException("env DORIS_HOME is not set");
|
||||
}
|
||||
|
||||
getRandomPort();
|
||||
|
||||
// start fe in "DORIS_HOME/fe/mocked/"
|
||||
MockedFrontend frontend = MockedFrontend.getInstance();
|
||||
Map<String, String> feConfMap = Maps.newHashMap();
|
||||
// set additional fe config
|
||||
feConfMap.put("http_port", String.valueOf(fe_http_port));
|
||||
feConfMap.put("rpc_port", String.valueOf(fe_rpc_port));
|
||||
feConfMap.put("query_port", String.valueOf(fe_query_port));
|
||||
feConfMap.put("edit_log_port", String.valueOf(fe_edit_log_port));
|
||||
feConfMap.put("tablet_create_timeout_second", "10");
|
||||
frontend.init(dorisHome + "/" + runningDir, feConfMap);
|
||||
frontend.start(new String[0]);
|
||||
|
||||
// start be
|
||||
MockedBackend backend = MockedBackendFactory.createBackend("127.0.0.1",
|
||||
be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port,
|
||||
new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port),
|
||||
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
|
||||
backend.setFeAddress(new TNetworkAddress("127.0.0.1", frontend.getRpcPort()));
|
||||
backend.start();
|
||||
|
||||
// add be
|
||||
List<Pair<String, Integer>> bes = Lists.newArrayList();
|
||||
bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort()));
|
||||
Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster");
|
||||
|
||||
// sleep to wait first heartbeat
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
|
||||
// generate all port from between 20000 ~ 30000
|
||||
private static void getRandomPort() {
|
||||
Random r = new Random(System.currentTimeMillis());
|
||||
int basePort = 20000 + r.nextInt(9000);
|
||||
fe_http_port = basePort + 1;
|
||||
fe_rpc_port = basePort + 2;
|
||||
fe_query_port = basePort + 3;
|
||||
fe_edit_log_port = basePort + 4;
|
||||
|
||||
be_heartbeat_port = basePort + 5;
|
||||
be_thrift_port = basePort + 6;
|
||||
be_brpc_port = basePort + 7;
|
||||
be_http_port = basePort + 8;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDbAndTable() throws Exception {
|
||||
// 1. create connect context
|
||||
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
|
||||
// 2. create database db1
|
||||
String createDbStmtStr = "create database db1;";
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
|
||||
Catalog.getCurrentCatalog().createDb(createDbStmt);
|
||||
System.out.println(Catalog.getCurrentCatalog().getDbNames());
|
||||
// 3. create table tbl1
|
||||
String createTblStmtStr = "create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
|
||||
Catalog.getCurrentCatalog().createTable(createTableStmt);
|
||||
// 4. get and test the created db and table
|
||||
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1");
|
||||
Assert.assertNotNull(db);
|
||||
db.readLock();
|
||||
try {
|
||||
OlapTable tbl = (OlapTable) db.getTable("tbl1");
|
||||
Assert.assertNotNull(tbl);
|
||||
System.out.println(tbl.getName());
|
||||
Assert.assertEquals("Doris", tbl.getEngine());
|
||||
Assert.assertEquals(1, tbl.getBaseSchema().size());
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
// 5. query
|
||||
// TODO: we can not process real query for now. So it has to be a explain query
|
||||
String queryStr = "explain select * from db1.tbl1";
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(ctx, queryStr);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
Assert.assertEquals(1, fragments.size());
|
||||
PlanFragment fragment = fragments.get(0);
|
||||
Assert.assertTrue(fragment.getPlanRoot() instanceof OlapScanNode);
|
||||
Assert.assertEquals(0, fragment.getChildren().size());
|
||||
}
|
||||
}
|
||||
194
fe/src/test/java/org/apache/doris/utframe/DemoTest.java
Normal file
194
fe/src/test/java/org/apache/doris/utframe/DemoTest.java
Normal file
@ -0,0 +1,194 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.utframe;
|
||||
|
||||
import org.apache.doris.alter.AlterJobV2;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.planner.OlapScanNode;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.DefaultPBackendServiceImpl;
|
||||
import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
|
||||
import org.apache.doris.utframe.MockedFrontend.FeStartException;
|
||||
import org.apache.doris.utframe.MockedFrontend.NotInitException;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
/*
|
||||
* This demo shows how to run unit test with mocked FE and BE.
|
||||
* It will
|
||||
* 1. start a mocked FE and a mocked BE.
|
||||
* 2. Create a database and a tbl.
|
||||
* 3. Make a schema change to tbl.
|
||||
* 4. send a query and get query plan
|
||||
*/
|
||||
public class DemoTest {
|
||||
|
||||
private static int fe_http_port;
|
||||
private static int fe_rpc_port;
|
||||
private static int fe_query_port;
|
||||
private static int fe_edit_log_port;
|
||||
|
||||
private static int be_heartbeat_port;
|
||||
private static int be_thrift_port;
|
||||
private static int be_brpc_port;
|
||||
private static int be_http_port;
|
||||
// use a unique dir so that it won't be conflict with other unit test which
|
||||
// may also start a Mocked Frontend
|
||||
private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws EnvVarNotSetException, IOException,
|
||||
FeStartException, NotInitException, DdlException, InterruptedException {
|
||||
// get DORIS_HOME
|
||||
final String dorisHome = System.getenv("DORIS_HOME");
|
||||
if (Strings.isNullOrEmpty(dorisHome)) {
|
||||
throw new EnvVarNotSetException("env DORIS_HOME is not set");
|
||||
}
|
||||
|
||||
getRandomPort();
|
||||
|
||||
// start fe in "DORIS_HOME/fe/mocked/"
|
||||
MockedFrontend frontend = MockedFrontend.getInstance();
|
||||
Map<String, String> feConfMap = Maps.newHashMap();
|
||||
// set additional fe config
|
||||
feConfMap.put("http_port", String.valueOf(fe_http_port));
|
||||
feConfMap.put("rpc_port", String.valueOf(fe_rpc_port));
|
||||
feConfMap.put("query_port", String.valueOf(fe_query_port));
|
||||
feConfMap.put("edit_log_port", String.valueOf(fe_edit_log_port));
|
||||
feConfMap.put("tablet_create_timeout_second", "10");
|
||||
frontend.init(dorisHome + "/" + runningDir, feConfMap);
|
||||
frontend.start(new String[0]);
|
||||
|
||||
// start be
|
||||
MockedBackend backend = MockedBackendFactory.createBackend("127.0.0.1",
|
||||
be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port,
|
||||
new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port),
|
||||
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
|
||||
backend.setFeAddress(new TNetworkAddress("127.0.0.1", frontend.getRpcPort()));
|
||||
backend.start();
|
||||
|
||||
// add be
|
||||
List<Pair<String, Integer>> bes = Lists.newArrayList();
|
||||
bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort()));
|
||||
Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster");
|
||||
|
||||
// sleep to wait first heartbeat
|
||||
Thread.sleep(6000);
|
||||
}
|
||||
|
||||
// generate all port from between 20000 ~ 30000
|
||||
private static void getRandomPort() {
|
||||
Random r = new Random(System.currentTimeMillis());
|
||||
int basePort = 20000 + r.nextInt(9000);
|
||||
fe_http_port = basePort + 1;
|
||||
fe_rpc_port = basePort + 2;
|
||||
fe_query_port = basePort + 3;
|
||||
fe_edit_log_port = basePort + 4;
|
||||
|
||||
be_heartbeat_port = basePort + 5;
|
||||
be_thrift_port = basePort + 6;
|
||||
be_brpc_port = basePort + 7;
|
||||
be_http_port = basePort + 8;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDbAndTable() throws Exception {
|
||||
// 1. create connect context
|
||||
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
|
||||
// 2. create database db1
|
||||
String createDbStmtStr = "create database db1;";
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
|
||||
Catalog.getCurrentCatalog().createDb(createDbStmt);
|
||||
System.out.println(Catalog.getCurrentCatalog().getDbNames());
|
||||
// 3. create table tbl1
|
||||
String createTblStmtStr = "create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
|
||||
Catalog.getCurrentCatalog().createTable(createTableStmt);
|
||||
// 4. get and test the created db and table
|
||||
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:db1");
|
||||
Assert.assertNotNull(db);
|
||||
db.readLock();
|
||||
try {
|
||||
OlapTable tbl = (OlapTable) db.getTable("tbl1");
|
||||
Assert.assertNotNull(tbl);
|
||||
System.out.println(tbl.getName());
|
||||
Assert.assertEquals("Doris", tbl.getEngine());
|
||||
Assert.assertEquals(1, tbl.getBaseSchema().size());
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
// 5. process a schema change job
|
||||
String alterStmtStr = "alter table db1.tbl1 add column k2 int default '1'";
|
||||
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, ctx);
|
||||
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
|
||||
// 6. check alter job
|
||||
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
|
||||
Assert.assertEquals(1, alterJobs.size());
|
||||
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
|
||||
while (!alterJobV2.getJobState().isFinalState()) {
|
||||
System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
|
||||
Thread.sleep(5000);
|
||||
}
|
||||
System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
|
||||
Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
|
||||
}
|
||||
db.readLock();
|
||||
try {
|
||||
OlapTable tbl = (OlapTable) db.getTable("tbl1");
|
||||
Assert.assertEquals(2, tbl.getBaseSchema().size());
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
// 7. query
|
||||
// TODO: we can not process real query for now. So it has to be a explain query
|
||||
String queryStr = "explain select * from db1.tbl1";
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(ctx, queryStr);
|
||||
stmtExecutor.execute();
|
||||
Planner planner = stmtExecutor.planner();
|
||||
List<PlanFragment> fragments = planner.getFragments();
|
||||
Assert.assertEquals(1, fragments.size());
|
||||
PlanFragment fragment = fragments.get(0);
|
||||
Assert.assertTrue(fragment.getPlanRoot() instanceof OlapScanNode);
|
||||
Assert.assertEquals(0, fragment.getChildren().size());
|
||||
}
|
||||
}
|
||||
129
fe/src/test/java/org/apache/doris/utframe/MockedBackend.java
Normal file
129
fe/src/test/java/org/apache/doris/utframe/MockedBackend.java
Normal file
@ -0,0 +1,129 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.utframe;
|
||||
|
||||
import org.apache.doris.common.ThriftServer;
|
||||
import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.HeartbeatService;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.utframe.MockedBackendFactory.BeThriftService;
|
||||
|
||||
import com.baidu.jprotobuf.pbrpc.transport.RpcServer;
|
||||
|
||||
import org.apache.thrift.TProcessor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/*
|
||||
* Mocked Backend
|
||||
* A mocked Backend has 3 rpc services.
|
||||
* HeartbeatService.Iface to handle heart beat from Frontend.
|
||||
* BeThriftService to handle agent tasks and other requests from Frontend.
|
||||
* BRpcService to handle the query request from Frontend.
|
||||
*
|
||||
* Users can create a BE by customizing three rpc services.
|
||||
*
|
||||
* Better to create a mocked Backend from MockedBackendFactory.
|
||||
* In MockedBackendFactory, there default rpc service for above 3 rpc services.
|
||||
*/
|
||||
public class MockedBackend {
|
||||
|
||||
private ThriftServer heartbeatServer;
|
||||
private ThriftServer beThriftServer;
|
||||
private RpcServer rpcServer;
|
||||
|
||||
private String host;
|
||||
private int heartbeatPort;
|
||||
private int thriftPort;
|
||||
private int brpcPort;
|
||||
private int httpPort;
|
||||
// the fe address: fe host and fe rpc port.
|
||||
// This must be set explicitly after creating mocked Backend
|
||||
private TNetworkAddress feAddress;
|
||||
|
||||
public MockedBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
|
||||
HeartbeatService.Iface hbService,
|
||||
BeThriftService backendService,
|
||||
Object pBackendService) throws IOException {
|
||||
|
||||
this.host = host;
|
||||
this.heartbeatPort = heartbeatPort;
|
||||
this.thriftPort = thriftPort;
|
||||
this.brpcPort = brpcPort;
|
||||
this.httpPort = httpPort;
|
||||
|
||||
createHeartbeatService(heartbeatPort, hbService);
|
||||
createBeThriftService(thriftPort, backendService);
|
||||
createBrpcService(brpcPort, pBackendService);
|
||||
|
||||
backendService.setBackend(this);
|
||||
backendService.init();
|
||||
}
|
||||
|
||||
public void setFeAddress(TNetworkAddress feAddress) {
|
||||
this.feAddress = feAddress;
|
||||
}
|
||||
|
||||
public TNetworkAddress getFeAddress() {
|
||||
return feAddress;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public int getHeartbeatPort() {
|
||||
return heartbeatPort;
|
||||
}
|
||||
|
||||
public int getBeThriftPort() {
|
||||
return thriftPort;
|
||||
}
|
||||
|
||||
public int getBrpcPort() {
|
||||
return brpcPort;
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return httpPort;
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
heartbeatServer.start();
|
||||
System.out.println("Be heartbeat service is started with port: " + heartbeatPort);
|
||||
beThriftServer.start();
|
||||
System.out.println("Be thrift service is started with port: " + thriftPort);
|
||||
rpcServer.start(brpcPort);
|
||||
System.out.println("Be brpc service is started with port: " + brpcPort);
|
||||
}
|
||||
|
||||
private void createHeartbeatService(int heartbeatPort, HeartbeatService.Iface serviceImpl) throws IOException {
|
||||
TProcessor tprocessor = new HeartbeatService.Processor<HeartbeatService.Iface>(serviceImpl);
|
||||
heartbeatServer = new ThriftServer(heartbeatPort, tprocessor);
|
||||
}
|
||||
|
||||
private void createBeThriftService(int beThriftPort, BackendService.Iface serviceImpl) throws IOException {
|
||||
TProcessor tprocessor = new BackendService.Processor<BackendService.Iface>(serviceImpl);
|
||||
beThriftServer = new ThriftServer(beThriftPort, tprocessor);
|
||||
}
|
||||
|
||||
private void createBrpcService(int brpcPort, Object pBackendServiceImpl) {
|
||||
rpcServer = new RpcServer();
|
||||
rpcServer.registerService(pBackendServiceImpl);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,337 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.utframe;
|
||||
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.proto.PCancelPlanFragmentRequest;
|
||||
import org.apache.doris.proto.PCancelPlanFragmentResult;
|
||||
import org.apache.doris.proto.PExecPlanFragmentResult;
|
||||
import org.apache.doris.proto.PFetchDataResult;
|
||||
import org.apache.doris.proto.PProxyRequest;
|
||||
import org.apache.doris.proto.PProxyResult;
|
||||
import org.apache.doris.proto.PQueryStatistics;
|
||||
import org.apache.doris.proto.PStatus;
|
||||
import org.apache.doris.proto.PTriggerProfileReportResult;
|
||||
import org.apache.doris.rpc.PExecPlanFragmentRequest;
|
||||
import org.apache.doris.rpc.PFetchDataRequest;
|
||||
import org.apache.doris.rpc.PTriggerProfileReportRequest;
|
||||
import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
import org.apache.doris.thrift.HeartbeatService;
|
||||
import org.apache.doris.thrift.TAgentPublishRequest;
|
||||
import org.apache.doris.thrift.TAgentResult;
|
||||
import org.apache.doris.thrift.TAgentTaskRequest;
|
||||
import org.apache.doris.thrift.TBackend;
|
||||
import org.apache.doris.thrift.TBackendInfo;
|
||||
import org.apache.doris.thrift.TCancelPlanFragmentParams;
|
||||
import org.apache.doris.thrift.TCancelPlanFragmentResult;
|
||||
import org.apache.doris.thrift.TDeleteEtlFilesRequest;
|
||||
import org.apache.doris.thrift.TEtlState;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentParams;
|
||||
import org.apache.doris.thrift.TExecPlanFragmentResult;
|
||||
import org.apache.doris.thrift.TExportState;
|
||||
import org.apache.doris.thrift.TExportStatusResult;
|
||||
import org.apache.doris.thrift.TExportTaskRequest;
|
||||
import org.apache.doris.thrift.TFetchDataParams;
|
||||
import org.apache.doris.thrift.TFetchDataResult;
|
||||
import org.apache.doris.thrift.TFinishTaskRequest;
|
||||
import org.apache.doris.thrift.THeartbeatResult;
|
||||
import org.apache.doris.thrift.TMasterInfo;
|
||||
import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
|
||||
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
|
||||
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
|
||||
import org.apache.doris.thrift.TRoutineLoadTask;
|
||||
import org.apache.doris.thrift.TScanBatchResult;
|
||||
import org.apache.doris.thrift.TScanCloseParams;
|
||||
import org.apache.doris.thrift.TScanCloseResult;
|
||||
import org.apache.doris.thrift.TScanNextBatchParams;
|
||||
import org.apache.doris.thrift.TScanOpenParams;
|
||||
import org.apache.doris.thrift.TScanOpenResult;
|
||||
import org.apache.doris.thrift.TSnapshotRequest;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TTabletStatResult;
|
||||
import org.apache.doris.thrift.TTransmitDataParams;
|
||||
import org.apache.doris.thrift.TTransmitDataResult;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.baidu.jprotobuf.pbrpc.ProtobufRPCService;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Queues;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
/*
|
||||
* This class is used to create mock backends.
|
||||
* Usage can be found in Demon.java's beforeClass()
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class MockedBackendFactory {
|
||||
|
||||
public static final String BE_DEFAULT_IP = "127.0.0.1";
|
||||
public static final int BE_DEFAULT_HEARTBEAT_PORT = 9050;
|
||||
public static final int BE_DEFAULT_THRIFT_PORT = 9060;
|
||||
public static final int BE_DEFAULT_BRPC_PORT = 8060;
|
||||
public static final int BE_DEFAULT_HTTP_PORT = 8040;
|
||||
|
||||
// create a default mocked backend with 3 default rpc services
|
||||
public static MockedBackend createDefaultBackend() throws IOException {
|
||||
return createBackend(BE_DEFAULT_IP, BE_DEFAULT_HEARTBEAT_PORT, BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_BRPC_PORT, BE_DEFAULT_HTTP_PORT,
|
||||
new DefaultHeartbeatServiceImpl(BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_HTTP_PORT, BE_DEFAULT_BRPC_PORT),
|
||||
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
|
||||
}
|
||||
|
||||
// create a mocked backend with customize parameters
|
||||
public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
|
||||
HeartbeatService.Iface hbService, BeThriftService beThriftService, Object pBackendService)
|
||||
throws IOException {
|
||||
MockedBackend backend = new MockedBackend(host, heartbeatPort, thriftPort, brpcPort, httpPort, hbService,
|
||||
beThriftService, pBackendService);
|
||||
return backend;
|
||||
}
|
||||
|
||||
// the default hearbeat service.
|
||||
// User can implement HeartbeatService.Iface to create other custom heartbeat service.
|
||||
public static class DefaultHeartbeatServiceImpl implements HeartbeatService.Iface {
|
||||
private int beThriftPort;
|
||||
private int beHttpPort;
|
||||
private int beBrpcPort;
|
||||
|
||||
public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort) {
|
||||
this.beThriftPort = beThriftPort;
|
||||
this.beHttpPort = beHttpPort;
|
||||
this.beBrpcPort = beBrpcPort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public THeartbeatResult heartbeat(TMasterInfo master_info) throws TException {
|
||||
TBackendInfo backendInfo = new TBackendInfo(beThriftPort, beHttpPort);
|
||||
backendInfo.setBrpc_port(beBrpcPort);
|
||||
THeartbeatResult result = new THeartbeatResult(new TStatus(TStatusCode.OK), backendInfo);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// abstract BeThriftService.
|
||||
// User can extends this abstract class to create other custom be thrift service
|
||||
public static abstract class BeThriftService implements BackendService.Iface {
|
||||
protected MockedBackend backend;
|
||||
|
||||
public void setBackend(MockedBackend backend) {
|
||||
this.backend = backend;
|
||||
}
|
||||
|
||||
public abstract void init();
|
||||
}
|
||||
|
||||
// the default be thrift service extends from BeThriftService
|
||||
public static class DefaultBeThriftServiceImpl extends BeThriftService {
|
||||
// task queue to save all agent tasks coming from Frontend
|
||||
private BlockingQueue<TAgentTaskRequest> taskQueue = Queues.newLinkedBlockingQueue();
|
||||
private TBackend tBackend;
|
||||
private long reportVersion = 0;
|
||||
|
||||
public DefaultBeThriftServiceImpl() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
tBackend = new TBackend(backend.getHost(), backend.getBeThriftPort(), backend.getHttpPort());
|
||||
// start a thread to handle all agent tasks in taskQueue.
|
||||
// Only return information that the task was successfully executed.
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
TAgentTaskRequest request = taskQueue.take();
|
||||
System.out.println("get agent task request. type: " + request.getTask_type()
|
||||
+ ", signature: " + request.getSignature());
|
||||
TFinishTaskRequest finishTaskRequest = new TFinishTaskRequest(tBackend,
|
||||
request.getTask_type(), request.getSignature(), new TStatus(TStatusCode.OK));
|
||||
finishTaskRequest.setReport_version(++reportVersion);
|
||||
|
||||
FrontendService.Client client = ClientPool.frontendPool.borrowObject(backend.getFeAddress(), 2000);
|
||||
System.out.println("get fe " + backend.getFeAddress() + " client: " + client);
|
||||
client.finishTask(finishTaskRequest);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TExecPlanFragmentResult exec_plan_fragment(TExecPlanFragmentParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TCancelPlanFragmentResult cancel_plan_fragment(TCancelPlanFragmentParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTransmitDataResult transmit_data(TTransmitDataParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFetchDataResult fetch_data(TFetchDataParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TAgentResult submit_tasks(List<TAgentTaskRequest> tasks) throws TException {
|
||||
for (TAgentTaskRequest request : tasks) {
|
||||
taskQueue.add(request);
|
||||
System.out.println("receive agent task request. type: " + request.getTask_type() + ", signature: "
|
||||
+ request.getSignature());
|
||||
}
|
||||
return new TAgentResult(new TStatus(TStatusCode.OK));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TAgentResult make_snapshot(TSnapshotRequest snapshot_request) throws TException {
|
||||
return new TAgentResult(new TStatus(TStatusCode.OK));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TAgentResult release_snapshot(String snapshot_path) throws TException {
|
||||
return new TAgentResult(new TStatus(TStatusCode.OK));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TAgentResult publish_cluster_state(TAgentPublishRequest request) throws TException {
|
||||
return new TAgentResult(new TStatus(TStatusCode.OK));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TAgentResult submit_etl_task(TMiniLoadEtlTaskRequest request) throws TException {
|
||||
return new TAgentResult(new TStatus(TStatusCode.OK));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TMiniLoadEtlStatusResult get_etl_status(TMiniLoadEtlStatusRequest request) throws TException {
|
||||
return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TAgentResult delete_etl_files(TDeleteEtlFilesRequest request) throws TException {
|
||||
return new TAgentResult(new TStatus(TStatusCode.OK));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TStatus submit_export_task(TExportTaskRequest request) throws TException {
|
||||
return new TStatus(TStatusCode.OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TExportStatusResult get_export_status(TUniqueId task_id) throws TException {
|
||||
return new TExportStatusResult(new TStatus(TStatusCode.OK), TExportState.FINISHED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TStatus erase_export_task(TUniqueId task_id) throws TException {
|
||||
return new TStatus(TStatusCode.OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTabletStatResult get_tablet_stat() throws TException {
|
||||
return new TTabletStatResult(Maps.newHashMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TStatus submit_routine_load_task(List<TRoutineLoadTask> tasks) throws TException {
|
||||
return new TStatus(TStatusCode.OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TScanOpenResult open_scanner(TScanOpenParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TScanBatchResult get_next(TScanNextBatchParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TScanCloseResult close_scanner(TScanCloseParams params) throws TException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// The default Brpc service.
|
||||
// TODO(cmy): Currently this service cannot correctly simulate the processing of query requests.
|
||||
public static class DefaultPBackendServiceImpl {
|
||||
@ProtobufRPCService(serviceName = "PBackendService", methodName = "exec_plan_fragment")
|
||||
public PExecPlanFragmentResult exec_plan_fragment(PExecPlanFragmentRequest request) {
|
||||
System.out.println("get exec_plan_fragment request");
|
||||
PExecPlanFragmentResult result = new PExecPlanFragmentResult();
|
||||
PStatus pStatus = new PStatus();
|
||||
pStatus.status_code = 0;
|
||||
result.status = pStatus;
|
||||
return result;
|
||||
}
|
||||
|
||||
@ProtobufRPCService(serviceName = "PBackendService", methodName = "cancel_plan_fragment")
|
||||
public PCancelPlanFragmentResult cancel_plan_fragment(PCancelPlanFragmentRequest request) {
|
||||
System.out.println("get cancel_plan_fragment request");
|
||||
PCancelPlanFragmentResult result = new PCancelPlanFragmentResult();
|
||||
PStatus pStatus = new PStatus();
|
||||
pStatus.status_code = 0;
|
||||
result.status = pStatus;
|
||||
return result;
|
||||
}
|
||||
|
||||
@ProtobufRPCService(serviceName = "PBackendService", methodName = "fetch_data")
|
||||
public PFetchDataResult fetchDataAsync(PFetchDataRequest request) {
|
||||
System.out.println("get fetch_data");
|
||||
PFetchDataResult result = new PFetchDataResult();
|
||||
PStatus pStatus = new PStatus();
|
||||
pStatus.status_code = 0;
|
||||
|
||||
PQueryStatistics pQueryStatistics = new PQueryStatistics();
|
||||
pQueryStatistics.scan_rows = 0L;
|
||||
pQueryStatistics.scan_bytes = 0L;
|
||||
|
||||
result.status = pStatus;
|
||||
result.packet_seq = 0L;
|
||||
result.query_statistics = pQueryStatistics;
|
||||
result.eos = true;
|
||||
return result;
|
||||
}
|
||||
|
||||
@ProtobufRPCService(serviceName = "PBackendService", methodName = "trigger_profile_report")
|
||||
public PTriggerProfileReportResult triggerProfileReport(PTriggerProfileReportRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@ProtobufRPCService(serviceName = "PBackendService", methodName = "get_info")
|
||||
public PProxyResult getInfo(PProxyRequest request) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
237
fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java
Normal file
237
fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java
Normal file
@ -0,0 +1,237 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.utframe;
|
||||
|
||||
import org.apache.doris.PaloFe;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
|
||||
/*
|
||||
* This class is used to start a Frontend process locally, for unit test.
|
||||
* This is a singleton class. There can be only one instance of this class globally.
|
||||
* Usage:
|
||||
* MockedFrontend mockedFrontend = MockedFrontend.getInstance();
|
||||
* mockedFrontend.init(confMap);
|
||||
* mockedFrontend.start(new String[0]);
|
||||
*
|
||||
* ...
|
||||
*
|
||||
* confMap is a instance of Map<String, String>.
|
||||
* Here you can add any FE configuration you want to add. For example:
|
||||
* confMap.put("http_port", "8032");
|
||||
*
|
||||
* FrontendProcess already contains a minimal set of FE configurations.
|
||||
* Any configuration in confMap will form the final fe.conf file with this minimal set.
|
||||
*
|
||||
* 1 environment variable must be set:
|
||||
* DORIS_HOME/
|
||||
*
|
||||
* The running dir is set when calling init();
|
||||
* There will be 3 directories under running dir/:
|
||||
* running dir/conf/
|
||||
* running dir/log/
|
||||
* running dir/palo-meta/
|
||||
*
|
||||
* All these 3 directories will be cleared first.
|
||||
*
|
||||
*/
|
||||
public class MockedFrontend {
|
||||
public static final String FE_PROCESS = "fe";
|
||||
|
||||
// the running dir of this mocked frontend.
|
||||
// log/ palo-meta/ and conf/ dirs will be created under this dir.
|
||||
private String runningDir;
|
||||
// the min set of fe.conf.
|
||||
private static final Map<String, String> MIN_FE_CONF;
|
||||
|
||||
private Map<String, String> finalFeConf;
|
||||
|
||||
static {
|
||||
MIN_FE_CONF = Maps.newHashMap();
|
||||
MIN_FE_CONF.put("sys_log_level", "INFO");
|
||||
MIN_FE_CONF.put("http_port", "8030");
|
||||
MIN_FE_CONF.put("rpc_port", "9020");
|
||||
MIN_FE_CONF.put("query_port", "9030");
|
||||
MIN_FE_CONF.put("edit_log_port", "9010");
|
||||
MIN_FE_CONF.put("priority_networks", "127.0.0.1/24");
|
||||
MIN_FE_CONF.put("sys_log_verbose_modules", "org");
|
||||
}
|
||||
|
||||
private static class SingletonHolder {
|
||||
private static final MockedFrontend INSTANCE = new MockedFrontend();
|
||||
}
|
||||
|
||||
public static MockedFrontend getInstance() {
|
||||
return SingletonHolder.INSTANCE;
|
||||
}
|
||||
|
||||
public int getRpcPort() {
|
||||
return Integer.valueOf(finalFeConf.get("rpc_port"));
|
||||
}
|
||||
|
||||
private boolean isInit = false;
|
||||
|
||||
// init the fe process. This must be called before starting the frontend process.
|
||||
// 1. check if all neccessary environment variables are set.
|
||||
// 2. clear and create 3 dirs: runningDir/log/, runningDir/palo-meta/, runningDir/conf/
|
||||
// 3. init fe.conf
|
||||
// The content of "fe.conf" is a merge set of input `feConf` and MIN_FE_CONF
|
||||
public void init(String runningDir, Map<String, String> feConf) throws EnvVarNotSetException, IOException {
|
||||
if (isInit) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (Strings.isNullOrEmpty(runningDir)) {
|
||||
System.err.println("running dir is not set for mocked frontend");
|
||||
throw new EnvVarNotSetException("running dir is not set for mocked frontend");
|
||||
}
|
||||
|
||||
this.runningDir = runningDir;
|
||||
System.out.println("mocked frontend running in dir: " + this.runningDir);
|
||||
|
||||
// root running dir
|
||||
createAndClearDir(this.runningDir);
|
||||
// clear and create log dir
|
||||
createAndClearDir(runningDir + "/log/");
|
||||
// clear and create meta dir
|
||||
createAndClearDir(runningDir + "/palo-meta/");
|
||||
// clear and create conf dir
|
||||
createAndClearDir(runningDir + "/conf/");
|
||||
// init fe.conf
|
||||
initFeConf(runningDir + "/conf/", feConf);
|
||||
|
||||
isInit = true;
|
||||
}
|
||||
|
||||
private void initFeConf(String confDir, Map<String, String> feConf) throws IOException {
|
||||
finalFeConf = Maps.newHashMap(MIN_FE_CONF);
|
||||
// these 2 configs depends on running dir, so set them here.
|
||||
finalFeConf.put("LOG_DIR", this.runningDir + "/log");
|
||||
finalFeConf.put("meta_dir", this.runningDir + "/palo-meta");
|
||||
finalFeConf.put("sys_log_dir", this.runningDir + "/log");
|
||||
finalFeConf.put("audit_log_dir", this.runningDir + "/log");
|
||||
finalFeConf.put("tmp_dir", this.runningDir + "/temp_dir");
|
||||
// use custom config to add or override default config
|
||||
finalFeConf.putAll(feConf);
|
||||
|
||||
PrintableMap<String, String> map = new PrintableMap<>(finalFeConf, "=", false, true, "");
|
||||
File confFile = new File(confDir + "fe.conf");
|
||||
if (!confFile.exists()) {
|
||||
confFile.createNewFile();
|
||||
}
|
||||
PrintWriter printWriter = new PrintWriter(confFile);
|
||||
try {
|
||||
printWriter.print(map.toString());
|
||||
printWriter.flush();
|
||||
} finally {
|
||||
printWriter.close();
|
||||
}
|
||||
}
|
||||
|
||||
// clear the specified dir, and create a empty one
|
||||
private void createAndClearDir(String dir) throws IOException {
|
||||
File localDir = new File(dir);
|
||||
if (!localDir.exists()) {
|
||||
localDir.mkdirs();
|
||||
} else {
|
||||
Files.walk(Paths.get(dir)).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
|
||||
if (!localDir.exists()) {
|
||||
localDir.mkdirs();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String getRunningDir() {
|
||||
return runningDir;
|
||||
}
|
||||
|
||||
private static class FERunnable implements Runnable {
|
||||
private MockedFrontend frontend;
|
||||
private String[] args;
|
||||
|
||||
public FERunnable(MockedFrontend frontend, String[] args) {
|
||||
this.frontend = frontend;
|
||||
this.args = args;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args);
|
||||
}
|
||||
}
|
||||
|
||||
// must call init() before start.
|
||||
public void start(String[] args) throws FeStartException, NotInitException {
|
||||
if (!isInit) {
|
||||
throw new NotInitException("fe process is not initialized");
|
||||
}
|
||||
Thread feThread = new Thread(new FERunnable(this, args), FE_PROCESS);
|
||||
feThread.start();
|
||||
// wait the catalog to be ready until timeout (30 seconds)
|
||||
waitForCatalogReady(30 * 1000);
|
||||
System.out.println("Fe process is started");
|
||||
}
|
||||
|
||||
private void waitForCatalogReady(long timeoutMs) throws FeStartException {
|
||||
long left = timeoutMs;
|
||||
while (!Catalog.getCurrentCatalog().isReady() && left > 0) {
|
||||
System.out.println("catalog is not ready");
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
left -= 5000;
|
||||
}
|
||||
|
||||
if (left <= 0 && !Catalog.getCurrentCatalog().isReady()) {
|
||||
throw new FeStartException("fe start failed");
|
||||
}
|
||||
}
|
||||
|
||||
public static class FeStartException extends Exception {
|
||||
public FeStartException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public static class EnvVarNotSetException extends Exception {
|
||||
public EnvVarNotSetException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public static class NotInitException extends Exception {
|
||||
public NotInitException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
57
fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
Normal file
57
fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
Normal file
@ -0,0 +1,57 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.utframe;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.analysis.SqlScanner;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import java.io.StringReader;
|
||||
|
||||
public class UtFrameUtils {
|
||||
|
||||
// Help to create a mocked ConnectContext.
|
||||
public static ConnectContext createDefaultCtx() {
|
||||
ConnectContext ctx = new ConnectContext();
|
||||
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
|
||||
ctx.setCurrentUserIdentity(UserIdentity.ROOT);
|
||||
ctx.setQualifiedUser(PaloAuth.ROOT_USER);
|
||||
ctx.setCatalog(Catalog.getCurrentCatalog());
|
||||
ctx.setThreadLocalInfo();
|
||||
return ctx;
|
||||
}
|
||||
|
||||
// Parse an origin stmt and analyze it. Return a StatementBase instance.
|
||||
public static StatementBase parseAndAnalyzeStmt(String originStmt, ConnectContext ctx)
|
||||
throws Exception {
|
||||
System.out.println("begin to parse stmt: " + originStmt);
|
||||
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
|
||||
SqlParser parser = new SqlParser(input);
|
||||
|
||||
StatementBase statementBase = (StatementBase) parser.parse().value;
|
||||
Analyzer analyzer = new Analyzer(ctx.getCatalog(), ctx);
|
||||
statementBase.analyze(analyzer);
|
||||
return statementBase;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user