diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 6c6ccf3953..d1edbd3f4c 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -316,38 +316,61 @@ class BE(Node): def expose_sub_dirs(self): return super().expose_sub_dirs() + ["storage"] - def init_disk(self, be_disk_num): - if not be_disk_num or be_disk_num <= 1: - return + def init_disk(self, be_disks): path = self.get_path() - for i in range(1, be_disk_num + 1): - os.makedirs("{}/storage/disk{}".format(path, i), exist_ok=True) + dirs = [] + dir_descs = [] + index = 0 + for disks in be_disks: + parts = disks.split(",") + if len(parts) != 1 and len(parts) != 2: + raise Exception("be disks has error: {}".format(disks)) + type_and_num = parts[0].split("=") + if len(type_and_num) != 2: + raise Exception("be disks has error: {}".format(disks)) + tp = type_and_num[0].strip().upper() + if tp != "HDD" and tp != "SSD": + raise Exception( + "error be disk type: '{}', should be 'HDD' or 'SSD'". + format(tp)) + num = int(type_and_num[1].strip()) + capactity = int(parts[1].strip()) if len(parts) >= 2 else -1 + capactity_desc = "_{}gb".format(capactity) if capactity > 0 else "" + + for i in range(num): + index += 1 + dir_name = "{}{}.{}".format(index, capactity_desc, tp) + dirs.append("{}/storage/{}".format(path, dir_name)) + dir_descs.append("${{DORIS_HOME}}/storage/{}{}".format( + dir_name, + ",capacity:" + str(capactity) if capactity > 0 else "")) + + for dir in dirs: + os.makedirs(dir, exist_ok=True) + with open("{}/conf/{}".format(path, self.conf_file_name()), "a") as f: - f.write("storage_root_path = " + ";".join([ - "${{DORIS_HOME}}/storage/disk{}".format(i) - for i in range(1, be_disk_num + 1) - ]) + "\n") + storage_root_path = ";".join(dir_descs) if dir_descs else '""' + f.write("storage_root_path = {}\n".format(storage_root_path)) class Cluster(object): - def __init__(self, name, subnet, image, fe_config, be_config, be_disk_num): + def __init__(self, name, subnet, image, fe_config, be_config, be_disks): self.name = name self.subnet = subnet self.image = image self.fe_config = fe_config self.be_config = be_config - self.be_disk_num = be_disk_num + self.be_disks = be_disks self.groups = { node_type: Group(node_type) for node_type in Node.TYPE_ALL } @staticmethod - def new(name, image, fe_config, be_config, be_disk_num): + def new(name, image, fe_config, be_config, be_disks): subnet = gen_subnet_prefix16() - cluster = Cluster(name, subnet, image, fe_config, be_config, - be_disk_num) + cluster = Cluster(name, subnet, image, fe_config, be_config, be_disks) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) return cluster @@ -426,7 +449,7 @@ class Cluster(object): node.init_conf(self.fe_config) elif node.is_be(): node.init_conf(self.be_config) - node.init_disk(self.be_disk_num) + node.init_disk(self.be_disks) else: node.init_conf([]) return node diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 65b19a561a..2797c1b30f 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -18,6 +18,7 @@ import argparse import cluster as CLUSTER import database +import dateutil.parser import utils import os import os.path @@ -78,15 +79,15 @@ class Command(object): parser.add_argument("--output-json", default=False, action=self._get_parser_bool_action(True), - help="output as json, and don't print log") + help="output as json, and don't print log.") def _add_parser_ids_args(self, parser): group = parser.add_argument_group("for existing nodes", "apply to the existing nodes.") group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \ - "if specific --fe-id but not specific ids, apply to all fe.") + "if specific --fe-id but not specific ids, apply to all fe. Example: '--fe-id 2 3' will select fe-2 and fe-3.") group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \ - "if specific --be but not specific ids, apply to all be.") + "if specific --be-id but not specific ids, apply to all be. Example: '--be-id' will select all backends.") def _get_parser_bool_action(self, is_store_true): if sys.version_info.major == 3 and sys.version_info.minor >= 9: @@ -140,7 +141,8 @@ class UpCommand(Command): type=int, default=0, help= - "Specify wait seconds for upping: 0(default) not wait, > 0 max wait seconds, -1 wait unlimited." + "Specify wait seconds for fe/be ready for service: 0 not wait (default), "\ + "> 0 max wait seconds, -1 wait unlimited." ) self._add_parser_output_json(parser) @@ -150,32 +152,44 @@ class UpCommand(Command): group1.add_argument( "--add-fe-num", type=int, - help="Specify add fe num, default 3 for a new cluster.") + help= + "Specify add fe num, default: 3 for a new cluster, 0 for a existing cluster." + ) group1.add_argument( "--add-be-num", type=int, - help="Specify add be num, default 3 for a new cluster.") + help= + "Specify add be num, default: 3 for a new cluster, 0 for a existing cluster." + ) group1.add_argument("--fe-config", nargs="*", type=str, - help="Specify fe configs.") + help="Specify fe configs for fe.conf. "\ + "Example: --fe-config \"enable_debug_points = true\" \"sys_log_level = ERROR\".") group1.add_argument("--be-config", nargs="*", type=str, - help="Specify be configs.") - group1.add_argument("--be-disk-num", - default=None, - type=int, - help="Specify be disk num, default is 1.") + help="Specify be configs for be.conf. "\ + "Example: --be-config \"enable_debug_points = true\" \"enable_auth = true\".") + group1.add_argument("--be-disks", + nargs="*", + default=["HDD=1"], + type=str, + help="Specify each be disks, each group is \"disk_type=disk_num[,disk_capactity]\", "\ + "disk_type is HDD or SSD, disk_capactity is capactity limit in gb. default: HDD=1. "\ + "Example: --be-disks \"HDD=1\", \"SSD=1,10\", \"SSD=2,100\""\ + "means each be has 1 HDD without capactity limit, 1 SSD with 10GB capactity limit, "\ + "2 SSD with 100GB capactity limit") self._add_parser_ids_args(parser) group2 = parser.add_mutually_exclusive_group() group2.add_argument( - "--no-start", - default=False, - action=self._get_parser_bool_action(True), - help="Not start containers, create or update config image only.") + "--start", + default=True, + action=self._get_parser_bool_action(False), + help="Start containers, default is true. If specific --no-start, "\ + "will create or update config image only but not start containers.") group2.add_argument("--force-recreate", default=False, action=self._get_parser_bool_action(True), @@ -204,7 +218,7 @@ class UpCommand(Command): utils.render_yellow("Ignore --be-id for new cluster")) cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.fe_config, args.be_config, - args.be_disk_num) + args.be_disks) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) if not args.add_fe_num: @@ -236,7 +250,7 @@ class UpCommand(Command): cluster.save() options = [] - if args.no_start: + if not args.start: options.append("--no-start") else: options = ["-d", "--remove-orphans"] @@ -250,7 +264,7 @@ class UpCommand(Command): utils.exec_docker_compose_command(cluster.get_compose_file(), "up", options, related_nodes) - if args.no_start: + if not args.start: LOG.info( utils.render_green( "Not up cluster cause specific --no-start, related node num {}" @@ -314,7 +328,7 @@ class DownCommand(Command): default=False, action=self._get_parser_bool_action(True), help= - "Clean container related files, include expose data, config and logs" + "Clean container related files, include expose data, config and logs." ) parser.add_argument( "--drop-force", @@ -469,7 +483,7 @@ class ListCommand(Command): "--all", default=False, action=self._get_parser_bool_action(True), - help="Show all stopped and bad doris compose projects") + help="Show all clusters, include stopped or bad clusters.") parser.add_argument("--detail", default=False, action=self._get_parser_bool_action(True), @@ -549,7 +563,7 @@ class ListCommand(Command): TYPE_COMPOSESERVICE = type(ComposeService("", "", "")) if not args.NAME: - header = ("CLUSTER", "STATUS", "CONFIG FILES") + header = ("CLUSTER", "OWNER", "STATUS", "CONFIG FILES") rows = [] for name in sorted(clusters.keys()): cluster_info = clusters[name] @@ -565,10 +579,11 @@ class ListCommand(Command): ]) if not args.all and service_statuses.get("running", 0) == 0: continue + owner = utils.get_path_owner(CLUSTER.get_cluster_path(name)) compose_file = CLUSTER.get_compose_file(name) - rows.append( - (name, show_status, "{}{}".format(compose_file, - cluster_info["status"]))) + rows.append((name, owner, show_status, + "{}{}".format(compose_file, + cluster_info["status"]))) return self._handle_data(header, rows) header = [ @@ -609,9 +624,9 @@ class ListCommand(Command): node.image = container.image node.status = SERVICE_DEAD else: - node.created = container.attrs.get("Created", - "")[:19].replace( - "T", " ") + node.created = dateutil.parser.parse( + container.attrs.get("Created")).astimezone().strftime( + "%Y-%m-%d %H:%M:%S") node.ip = list( container.attrs["NetworkSettings"] ["Networks"].values())[0]["IPAMConfig"]["IPv4Address"] diff --git a/docker/runtime/doris-compose/database.py b/docker/runtime/doris-compose/database.py index 69b2cb50f2..d29905e94a 100644 --- a/docker/runtime/doris-compose/database.py +++ b/docker/runtime/doris-compose/database.py @@ -150,7 +150,8 @@ class DBManager(object): is_master = utils.is_true(is_master) alive = utils.is_true(alive) id = CLUSTER.Node.get_id_from_ip(ip) - query_port = query_ports.get(id, None) + query_port = query_ports.get(id, "") + last_heartbeat = utils.escape_null(last_heartbeat) fe = FEState(id, query_port, is_master, alive, last_heartbeat, err_msg) fe_states[id] = fe @@ -172,6 +173,7 @@ class DBManager(object): decommissioned = utils.is_true(decommissioned) tablet_num = int(tablet_num) id = CLUSTER.Node.get_id_from_ip(ip) + last_heartbeat = utils.escape_null(last_heartbeat) be = BEState(id, backend_id, decommissioned, alive, tablet_num, last_heartbeat, err_msg) be_states[id] = be diff --git a/docker/runtime/doris-compose/requirements.txt b/docker/runtime/doris-compose/requirements.txt index 039260e6c7..7d03121e89 100644 --- a/docker/runtime/doris-compose/requirements.txt +++ b/docker/runtime/doris-compose/requirements.txt @@ -20,3 +20,4 @@ docker-compose jsonpickle prettytable pymysql +python-dateutil diff --git a/docker/runtime/doris-compose/utils.py b/docker/runtime/doris-compose/utils.py index dfed11e8b0..c9c0c6593b 100644 --- a/docker/runtime/doris-compose/utils.py +++ b/docker/runtime/doris-compose/utils.py @@ -19,6 +19,7 @@ import docker import json import logging import os +import pwd import subprocess import time import yaml @@ -264,6 +265,13 @@ def enable_dir_with_rw_perm(dir): entrypoint="chmod a+rw -R {}".format("/opt/mount")) +def get_path_owner(path): + try: + return pwd.getpwuid(os.stat(path).st_uid).pw_name + except: + return "" + + def read_compose_file(file): with open(file, "r") as f: return yaml.safe_load(f.read()) @@ -280,3 +288,7 @@ def pretty_json(json_data): def is_true(val): return str(val) == "true" or str(val) == "1" + + +def escape_null(val): + return "" if val == "\\N" else val diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 29935caf25..5d1d3ec249 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -34,10 +34,17 @@ class ClusterOptions { int feNum = 1 int beNum = 3 - int beDiskNum = 1 List feConfigs = [] List beConfigs = [] + // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] + // here disk_type=HDD or SSD, disk capacity is in gb unit. + // for example: beDisks = ["HDD=1", "SSD=2,10", "SSD=10,3"] means: + // each be has 1 HDD disks without capacity limit, 2 SSD disks with 10GB capacity limit, + // and 10 SSD disks with 3GB capacity limit + // if not specific, docker will let each be contains 1 HDD disk. + List beDisks = null + void enableDebugPoints() { feConfigs.add('enable_debug_points=true') beConfigs.add('enable_debug_points=true') @@ -184,7 +191,9 @@ class SuiteCluster { sb.append('--be-config ') options.beConfigs.forEach(item -> sb.append(' ' + item + ' ')) } - sb.append('--be-disk-num ' + options.beDiskNum + ' ') + if (options.beDisks != null) { + sb.append('--be-disks ' + options.beDisks.join(" ") + ' ') + } sb.append('--wait-timeout 180') runCmd(sb.toString(), -1) diff --git a/regression-test/suites/demo_p0/docker_action.groovy b/regression-test/suites/demo_p0/docker_action.groovy index 0dbb818e98..8c827aa40f 100644 --- a/regression-test/suites/demo_p0/docker_action.groovy +++ b/regression-test/suites/demo_p0/docker_action.groovy @@ -18,6 +18,7 @@ import org.apache.doris.regression.suite.ClusterOptions suite('docker_action') { + // run a new docker docker { sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10''' sql '''insert into tb1 values (1),(2),(3)''' @@ -37,14 +38,15 @@ suite('docker_action') { } } - // run another docker def options = new ClusterOptions() // add fe config items options.feConfigs = ['example_conf_k1=v1', 'example_conf_k2=v2'] // contains 5 backends options.beNum = 5 - // each backend has 3 disks - options.beDiskNum = 3 + // each backend has 1 HDD disk and 3 SSD disks + options.beDisks = ['HDD=1', 'SSD=3'] + + // run another docker docker(options) { sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 properties ("replication_num"="5")''' }