diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 0f6afb906a..d78c035f07 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import filelock import json import jsonpickle import os @@ -22,10 +23,8 @@ import os.path import utils DOCKER_DORIS_PATH = "/opt/apache-doris" -LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH") -if not LOCAL_DORIS_PATH: - LOCAL_DORIS_PATH = "/tmp/doris" - +LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH", "/tmp/doris") +DORIS_SUBNET_START = int(os.getenv("DORIS_SUBNET_START", 128)) LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource") DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource") @@ -59,17 +58,30 @@ def get_status_path(cluster_name): return os.path.join(get_cluster_path(cluster_name), "status") +def get_all_cluster_names(): + if not os.path.exists(LOCAL_DORIS_PATH): + return [] + else: + return [ + subdir for subdir in os.listdir(LOCAL_DORIS_PATH) + if os.path.isdir(os.path.join(LOCAL_DORIS_PATH, subdir)) + ] + + +def lock_network(): + return filelock.FileLock(os.path.join(LOCAL_DORIS_PATH, "lock")) + + def gen_subnet_prefix16(): used_subnet = utils.get_docker_subnets_prefix16() - if os.path.exists(LOCAL_DORIS_PATH): - for cluster_name in os.listdir(LOCAL_DORIS_PATH): - try: - cluster = Cluster.load(cluster_name) - used_subnet[cluster.subnet] = True - except: - pass + for cluster_name in get_all_cluster_names(): + try: + cluster = Cluster.load(cluster_name) + used_subnet[cluster.subnet] = True + except: + pass - for i in range(128, 192): + for i in range(DORIS_SUBNET_START, 192): for j in range(256): subnet = "{}.{}".format(i, j) if not used_subnet.get(subnet, False): @@ -392,12 +404,14 @@ class Cluster(object): @staticmethod def new(name, image, fe_config, be_config, be_disks, coverage_dir): - subnet = gen_subnet_prefix16() - cluster = Cluster(name, subnet, image, fe_config, be_config, be_disks, - coverage_dir) - os.makedirs(cluster.get_path(), exist_ok=True) - os.makedirs(get_status_path(name), exist_ok=True) - return cluster + with filelock.FileLock(os.path.join(LOCAL_DORIS_PATH, "lock")): + subnet = gen_subnet_prefix16() + cluster = Cluster(name, subnet, image, fe_config, be_config, + be_disks, coverage_dir) + os.makedirs(cluster.get_path(), exist_ok=True) + os.makedirs(get_status_path(name), exist_ok=True) + cluster._save_meta() + return cluster @staticmethod def load(name): diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index cfc3f137e1..4d1d6e7edb 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -359,6 +359,10 @@ class DownCommand(Command): utils.exec_docker_compose_command(cluster.get_compose_file(), "down", ["-v", "--remove-orphans"]) + try: + utils.remove_docker_network(cluster.name) + except Exception as e: + LOG.warn("prune network has exception: " + str(e)) if args.clean: utils.enable_dir_with_rw_perm(cluster.get_path()) shutil.rmtree(cluster.get_path()) @@ -544,8 +548,8 @@ class ListCommand(Command): search_names = [] if args.NAME: search_names = args.NAME - elif os.path.exists(CLUSTER.LOCAL_DORIS_PATH): - search_names = os.listdir(CLUSTER.LOCAL_DORIS_PATH) + else: + search_names = CLUSTER.get_all_cluster_names() for cluster_name in search_names: status, services = parse_cluster_compose_file(cluster_name) diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/requirements.txt index 7d03121e89..ac177eddf8 100644 --- a/docker/runtime/doris-compose/requirements.txt +++ b/docker/runtime/doris-compose/requirements.txt @@ -17,6 +17,7 @@ docker docker-compose +filelock jsonpickle prettytable pymysql diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh index 8971ac2515..5342eb0e54 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/common.sh @@ -45,7 +45,8 @@ wait_pid() { health_log "wait process $pid" while true; do - if [ ! ps -p $pid ] >/dev/null; then + ps -p $pid >/dev/null + if [ $? -ne 0 ]; then break fi sleep 1s diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index c9c0c6593b..8b4b39619b 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -168,6 +168,13 @@ def get_doris_running_containers(cluster_name): } +def remove_docker_network(cluster_name): + client = docker.client.from_env() + for network in client.networks.list( + names=[cluster_name + "_" + with_doris_prefix(cluster_name)]): + network.remove() + + def is_dir_empty(dir): return False if os.listdir(dir) else True diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 039913b724..47b642af48 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -34,7 +34,7 @@ class ClusterOptions { int feNum = 1 int beNum = 3 - List feConfigs = [] + List feConfigs = ['heartbeat_interval_second=5'] List beConfigs = [] // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] @@ -85,6 +85,10 @@ class ServerNode { node.alive = fields.get(header.indexOf('alive')) == 'true' } + static long toLongOrDefault(Object val, long defValue) { + return val == '' ? defValue : (long) val + } + def getHttpAddress() { return [host, httpPort] } @@ -137,8 +141,8 @@ class Backend extends ServerNode { static Backend fromCompose(ListHeader header, int index, List fields) { Backend be = new Backend() ServerNode.fromCompose(be, header, index, fields) - be.backendId = (long) fields.get(header.indexOf('backend_id')) - be.tabletNum = (int) fields.get(header.indexOf('tablet_num')) + be.backendId = toLongOrDefault(fields.get(header.indexOf('backend_id')), -1L) + be.tabletNum = (int) toLongOrDefault(fields.get(header.indexOf('tablet_num')), 0L) return be } @@ -438,6 +442,7 @@ class SuiteCluster { } private void waitHbChanged() { + // heart beat interval is 5s Thread.sleep(7000) } @@ -467,14 +472,15 @@ class SuiteCluster { } else { proc.waitFor() } - def out = outBuf.toString() - def err = errBuf.toString() - if (proc.exitValue()) { + if (proc.exitValue() != 0) { throw new Exception(String.format('Exit value: %s != 0, stdout: %s, stderr: %s', - proc.exitValue(), out, err)) + proc.exitValue(), outBuf.toString(), errBuf.toString())) } def parser = new JsonSlurper() - def object = (Map) parser.parseText(out) + if (outBuf.toString().size() == 0) { + throw new Exception(String.format('doris compose output is empty, err: %s', errBuf.toString())) + } + def object = (Map) parser.parseText(outBuf.toString()) if (object.get('code') != 0) { throw new Exception(String.format('Code: %s != 0, err: %s', object.get('code'), object.get('err'))) }