Files
doris/docker/runtime/doris-compose/command.py

948 lines
36 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import cluster as CLUSTER
import database
import dateutil.parser
import utils
import os
import os.path
import prettytable
import shutil
import sys
import time
LOG = utils.get_logger()
# return for_all, related_nodes, related_node_num
def get_ids_related_nodes(cluster,
fe_ids,
be_ids,
ms_ids,
recycle_ids,
fdb_ids,
ignore_not_exists=False):
if fe_ids is None and be_ids is None and ms_ids is None and recycle_ids is None and fdb_ids is None:
return True, None, cluster.get_all_nodes_num()
def get_ids_related_nodes_with_type(node_type, ids):
if ids is None:
return []
if not ids:
return cluster.get_all_nodes(node_type)
else:
nodes = []
for id in ids:
try:
nodes.append(cluster.get_node(node_type, id))
except Exception as e:
if ignore_not_exists:
LOG.warning(
utils.render_yellow(
"Not found {} with id {}".format(
node_type, id)))
else:
raise e
return nodes
type_ids = [
(CLUSTER.Node.TYPE_FE, fe_ids),
(CLUSTER.Node.TYPE_BE, be_ids),
(CLUSTER.Node.TYPE_MS, ms_ids),
(CLUSTER.Node.TYPE_RECYCLE, recycle_ids),
(CLUSTER.Node.TYPE_FDB, fdb_ids),
]
nodes = []
for node_type, ids in type_ids:
nodes.extend(get_ids_related_nodes_with_type(node_type, ids))
related_node_num = len(nodes)
return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)
class Command(object):
def __init__(self, name):
self.name = name
def print_use_time(self):
return True
def add_parser(self, args_parsers):
raise Exception("No implemented")
def run(self, args):
raise Exception("No implemented")
def _add_parser_output_json(self, parser):
parser.add_argument("--output-json",
default=False,
action=self._get_parser_bool_action(True),
help="output as json, and don't print log.")
def _add_parser_ids_args(self, parser):
group = parser.add_argument_group("for existing nodes",
"apply to the existing nodes.")
group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \
"if specific --fe-id but not specific ids, apply to all fe. Example: '--fe-id 2 3' will select fe-2 and fe-3.")
group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \
"if specific --be-id but not specific ids, apply to all be. Example: '--be-id' will select all backends.")
group.add_argument(
"--ms-id",
nargs="*",
type=int,
help=
"Specify up ms ids, support multiple ids. Only use in cloud cluster."
)
group.add_argument(
"--recycle-id",
nargs="*",
type=int,
help=
"Specify up recycle ids, support multiple ids. Only use in cloud cluster."
)
group.add_argument(
"--fdb-id",
nargs="*",
type=int,
help=
"Specify up fdb ids, support multiple ids. Only use in cloud cluster."
)
def _get_parser_bool_action(self, is_store_true):
if self._support_boolean_action():
return argparse.BooleanOptionalAction
else:
return "store_true" if is_store_true else "store_false"
def _support_boolean_action(self):
return sys.version_info.major == 3 and sys.version_info.minor >= 9
class SimpleCommand(Command):
def __init__(self, command, help):
super().__init__(command)
self.command = command
self.help = help
def add_parser(self, args_parsers):
help = self.help + " If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
"then apply to all containers."
parser = args_parsers.add_parser(self.command, help=help)
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_ids_args(parser)
self._add_parser_output_json(parser)
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
_, related_nodes, related_node_num = get_ids_related_nodes(
cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
args.fdb_id)
utils.exec_docker_compose_command(cluster.get_compose_file(),
self.command,
nodes=related_nodes)
show_cmd = self.command[0].upper() + self.command[1:]
LOG.info(
utils.render_green("{} succ, total related node num {}".format(
show_cmd, related_node_num)))
class UpCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\
"or add new containers. " \
"If none of --add-fe-num, --add-be-num, --add-ms-num, --add-recycle-num, "\
"--fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, " \
"then apply to all containers.")
parser.add_argument("NAME", default="", help="Specific cluster name.")
parser.add_argument("IMAGE",
default="",
nargs="?",
help="Specify docker image.")
parser.add_argument(
"--cloud",
default=False,
action=self._get_parser_bool_action(True),
help=
"Create cloud cluster, default is false. Only use when creating new cluster."
)
parser.add_argument(
"--wait-timeout",
type=int,
default=0,
help=
"Specify wait seconds for fe/be ready for service: 0 not wait (default), "\
"> 0 max wait seconds, -1 wait unlimited."
)
self._add_parser_output_json(parser)
group1 = parser.add_argument_group("add new nodes",
"add cluster nodes.")
group1.add_argument(
"--add-fe-num",
type=int,
help=
"Specify add fe num, default: 3 for a new cluster, 0 for a existing cluster."
)
group1.add_argument(
"--add-be-num",
type=int,
help=
"Specify add be num, default: 3 for a new cluster, 0 for a existing cluster."
)
group1.add_argument(
"--add-ms-num",
type=int,
help=
"Specify add ms num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
)
group1.add_argument(
"--add-recycle-num",
type=int,
help=
"Specify add recycle num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
)
group1.add_argument("--fe-config",
nargs="*",
type=str,
help="Specify fe configs for fe.conf. "\
"Example: --fe-config \"enable_debug_points = true\" \"sys_log_level = ERROR\".")
group1.add_argument("--be-config",
nargs="*",
type=str,
help="Specify be configs for be.conf. "\
"Example: --be-config \"enable_debug_points = true\" \"enable_auth = true\".")
group1.add_argument("--be-disks",
nargs="*",
default=["HDD=1"],
type=str,
help="Specify each be disks, each group is \"disk_type=disk_num[,disk_capactity]\", "\
"disk_type is HDD or SSD, disk_capactity is capactity limit in gb. default: HDD=1. "\
"Example: --be-disks \"HDD=1\", \"SSD=1,10\", \"SSD=2,100\""\
"means each be has 1 HDD without capactity limit, 1 SSD with 10GB capactity limit, "\
"2 SSD with 100GB capactity limit")
group1.add_argument(
"--be-cluster",
type=str,
help=
"be cluster name, if not specific, will use compute_cluster. Only use in cloud cluster."
)
self._add_parser_ids_args(parser)
group2 = parser.add_mutually_exclusive_group()
if self._support_boolean_action():
group2.add_argument(
"--start",
default=True,
action=self._get_parser_bool_action(False),
help="Start containers, default is true. If specific --no-start, "\
"will create or update config image only but not start containers.")
else:
group2.add_argument(
"--no-start",
dest='start',
default=True,
action=self._get_parser_bool_action(False),
help=
"Create or update config image only and don't start containers."
)
group2.add_argument("--force-recreate",
default=False,
action=self._get_parser_bool_action(True),
help="Recreate containers even if their configuration" \
"and image haven't changed. ")
parser.add_argument("--coverage-dir",
default="",
help="code coverage output directory")
parser.add_argument(
"--fdb-version",
type=str,
default="7.1.26",
help="fdb image version. Only use in cloud cluster.")
def run(self, args):
if not args.NAME:
raise Exception("Need specific not empty cluster name")
for_all = True
add_fdb_num = 0
try:
cluster = CLUSTER.Cluster.load(args.NAME)
if not cluster.is_cloud:
args.add_ms_num = None
args.add_recycle_num = None
args.ms_id = None
args.recycle_id = None
args.fdb_id = None
if args.fe_id != None or args.be_id != None \
or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \
or args.add_fe_num or args.add_be_num \
or args.add_ms_num or args.add_recycle_num:
for_all = False
except:
# a new cluster
if not args.IMAGE:
raise Exception("New cluster must specific image") from None
if args.fe_id != None:
args.fe_id = None
LOG.warning(
utils.render_yellow("Ignore --fe-id for new cluster"))
if args.be_id != None:
args.be_id = None
LOG.warning(
utils.render_yellow("Ignore --be-id for new cluster"))
args.fdb_id = None
args.ms_id = None
args.recycle_id = None
if args.add_fe_num is None:
args.add_fe_num = 3
if args.add_be_num is None:
args.add_be_num = 3
cloud_store_config = {}
if args.cloud:
add_fdb_num = 1
if not args.add_ms_num:
args.add_ms_num = 1
if not args.add_recycle_num:
args.add_recycle_num = 1
if not args.be_cluster:
args.be_cluster = "compute_cluster"
cloud_store_config = self._get_cloud_store_config()
else:
args.add_ms_num = 0
args.add_recycle_num = 0
cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
args.fe_config, args.be_config,
args.be_disks, args.be_cluster,
args.coverage_dir,
cloud_store_config)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
if args.be_cluster and cluster.is_cloud:
cluster.be_cluster = args.be_cluster
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id, args.ms_id,
args.recycle_id,
args.fdb_id)
add_fe_ids = []
add_be_ids = []
add_ms_ids = []
add_recycle_ids = []
add_fdb_ids = []
add_type_nums = [
(CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids),
(CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids),
(CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids),
(CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids),
(CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids),
]
if not related_nodes:
related_nodes = []
def do_add_node(node_type, add_num, add_ids):
if not add_num:
return
for i in range(add_num):
node = cluster.add(node_type)
related_nodes.append(node)
add_ids.append(node.id)
for node_type, add_num, add_ids in add_type_nums:
do_add_node(node_type, add_num, add_ids)
if args.IMAGE:
for node in related_nodes:
node.set_image(args.IMAGE)
if for_all:
cluster.set_image(args.IMAGE)
for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB):
node.set_image("foundationdb/foundationdb:{}".format(
args.fdb_version))
cluster.save()
options = []
if not args.start:
options.append("--no-start")
else:
options = ["-d", "--remove-orphans"]
if args.force_recreate:
options.append("--force-recreate")
related_node_num = len(related_nodes)
if for_all:
related_node_num = cluster.get_all_nodes_num()
related_nodes = None
utils.exec_docker_compose_command(cluster.get_compose_file(), "up",
options, related_nodes)
ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
LOG.info(
"Master fe query address: " +
utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
"\n")
if not args.start:
LOG.info(
utils.render_green(
"Not up cluster cause specific --no-start, related node num {}"
.format(related_node_num)))
else:
if args.wait_timeout != 0:
if args.wait_timeout == -1:
args.wait_timeout = 1000000000
expire_ts = time.time() + args.wait_timeout
while True:
db_mgr = database.get_db_mgr(args.NAME, False)
dead_frontends = []
for id in add_fe_ids:
fe_state = db_mgr.get_fe(id)
if not fe_state or not fe_state.alive:
dead_frontends.append(id)
dead_backends = []
for id in add_be_ids:
be_state = db_mgr.get_be(id)
if not be_state or not be_state.alive:
dead_backends.append(id)
if not dead_frontends and not dead_backends:
break
if time.time() >= expire_ts:
err = ""
if dead_frontends:
err += "dead fe: " + str(dead_frontends) + ". "
if dead_backends:
err += "dead be: " + str(dead_backends) + ". "
raise Exception(err)
time.sleep(1)
LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
return {
"fe": {
"add_list": add_fe_ids,
},
"be": {
"add_list": add_be_ids,
},
"ms": {
"add_list": add_ms_ids,
},
"recycle": {
"add_list": add_recycle_ids,
},
"fdb": {
"add_list": add_fdb_ids,
},
}
def _get_cloud_store_config(self):
example_cfg_file = os.path.join(CLUSTER.LOCAL_RESOURCE_PATH,
"cloud.ini.example")
if not CLUSTER.CLOUD_CFG_FILE:
raise Exception("Cloud cluster need S3 store, specific its config in a file.\n" \
"A example file is " + example_cfg_file + ".\n" \
"Then setting the env variable `export DORIS_CLOUD_CFG_FILE=<cfg-file-path>`.")
if not os.path.exists(CLUSTER.CLOUD_CFG_FILE):
raise Exception("Cloud store config file '" +
CLUSTER.CLOUD_CFG_FILE + "' not exists.")
config = {}
with open(example_cfg_file, "r") as f:
for line in f.readlines():
if line.startswith('#'):
continue
pos = line.find('=')
if pos <= 0:
continue
key = line[0:pos].strip()
if key:
config[key] = ""
with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
for line in f.readlines():
if line.startswith('#'):
continue
pos = line.find('=')
if pos <= 0:
continue
key = line[0:pos].strip()
if key and config.get(key, None) != None:
config[key] = line[line.find('=') + 1:].strip()
for key, value in config.items():
if not value:
raise Exception(
"Should provide none empty property '{}' in file {}".
format(key, CLUSTER.CLOUD_CFG_FILE))
return config
class DownCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser("down",
help="Down doris containers, networks. "\
"It will also remove node from DB. " \
"If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
"then apply to all containers.")
parser.add_argument("NAME", help="Specify cluster name")
self._add_parser_ids_args(parser)
self._add_parser_output_json(parser)
parser.add_argument(
"--clean",
default=False,
action=self._get_parser_bool_action(True),
help=
"Clean container related files, include expose data, config and logs."
)
parser.add_argument(
"--drop-force",
default=None,
action=self._get_parser_bool_action(True),
help="Drop doris node force. For be, if specific --drop-force, "\
"it will send dropp to fe, otherwise send decommission to fe.")
def run(self, args):
cluster = None
try:
cluster = CLUSTER.Cluster.load(args.NAME)
except:
return "Cluster not exists or load failed"
for_all, related_nodes, related_node_num = get_ids_related_nodes(
cluster,
args.fe_id,
args.be_id,
args.ms_id,
args.recycle_id,
args.fdb_id,
ignore_not_exists=True)
if for_all:
if os.path.exists(cluster.get_compose_file()):
try:
utils.exec_docker_compose_command(
cluster.get_compose_file(), "down",
["-v", "--remove-orphans"])
except Exception as e:
LOG.warn("down cluster has exception: " + str(e))
try:
utils.remove_docker_network(cluster.name)
except Exception as e:
LOG.warn("remove network has exception: " + str(e))
if args.clean:
utils.enable_dir_with_rw_perm(cluster.get_path())
shutil.rmtree(cluster.get_path())
LOG.info(
utils.render_yellow(
"Clean cluster data cause has specific --clean"))
else:
db_mgr = database.get_db_mgr(cluster.name)
for node in related_nodes:
if node.is_fe():
fe_endpoint = "{}:{}".format(node.get_ip(),
CLUSTER.FE_EDITLOG_PORT)
db_mgr.drop_fe(fe_endpoint)
elif node.is_be():
be_endpoint = "{}:{}".format(node.get_ip(),
CLUSTER.BE_HEARTBEAT_PORT)
if args.drop_force:
db_mgr.drop_be(be_endpoint)
else:
db_mgr.decommission_be(be_endpoint)
else:
raise Exception("Unknown node type: {}".format(
node.node_type()))
#utils.exec_docker_compose_command(cluster.get_compose_file(),
# "stop",
# nodes=[node])
utils.exec_docker_compose_command(cluster.get_compose_file(),
"rm", ["-s", "-v", "-f"],
nodes=[node])
if args.clean:
utils.enable_dir_with_rw_perm(node.get_path())
shutil.rmtree(node.get_path())
register_file = "{}/{}-{}-register".format(
CLUSTER.get_status_path(cluster.name),
node.node_type(), node.get_ip())
if os.path.exists(register_file):
os.remove(register_file)
LOG.info(
utils.render_yellow(
"Clean {} with id {} data cause has specific --clean"
.format(node.node_type(), node.id)))
cluster.remove(node.node_type(), node.id)
cluster.save()
LOG.info(
utils.render_green(
"Down cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
return "down cluster succ"
class ListNode(object):
def __init__(self):
self.node_type = ""
self.id = 0
self.backend_id = ""
self.cluster_name = ""
self.ip = ""
self.status = ""
self.container_id = ""
self.image = ""
self.created = ""
self.alive = ""
self.is_master = ""
self.query_port = ""
self.tablet_num = ""
self.last_heartbeat = ""
self.err_msg = ""
def info(self, detail):
result = [
self.cluster_name, "{}-{}".format(self.node_type, self.id),
self.ip, self.status, self.container_id, self.image, self.created,
self.alive, self.is_master, self.backend_id, self.tablet_num,
self.last_heartbeat, self.err_msg
]
if detail:
query_port = ""
http_port = ""
if self.node_type == CLUSTER.Node.TYPE_FE:
query_port = CLUSTER.FE_QUERY_PORT
http_port = CLUSTER.FE_HTTP_PORT
elif self.node_type == CLUSTER.Node.TYPE_BE:
http_port = CLUSTER.BE_WEBSVR_PORT
else:
pass
result += [
query_port,
http_port,
]
return result
def update_db_info(self, db_mgr):
if self.node_type == CLUSTER.Node.TYPE_FE:
fe = db_mgr.get_fe(self.id)
if fe:
self.alive = str(fe.alive).lower()
self.is_master = str(fe.is_master).lower()
self.query_port = fe.query_port
self.last_heartbeat = fe.last_heartbeat
self.err_msg = fe.err_msg
elif self.node_type == CLUSTER.Node.TYPE_BE:
self.backend_id = -1
be = db_mgr.get_be(self.id)
if be:
self.alive = str(be.alive).lower()
self.backend_id = be.backend_id
self.tablet_num = be.tablet_num
self.last_heartbeat = be.last_heartbeat
self.err_msg = be.err_msg
class GenConfCommand(Command):
def print_use_time(self):
return False
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"config",
help="Generate regression-conf-custom.groovy for regression test.")
parser.add_argument("NAME", default="", help="Specific cluster name.")
return parser
def run(self, args):
content = '''
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
feSourceThriftAddress = "127.0.0.1:9020"
feTargetThriftAddress = "127.0.0.1:9020"
syncerAddress = "127.0.0.1:9190"
feHttpAddress = "127.0.0.1:8030"
'''
master_fe_ip = CLUSTER.get_master_fe_endpoint(args.NAME)
if not master_fe_ip:
print("Not found cluster with name {} in directory {}".format(
args.NAME, CLUSTER.LOCAL_DORIS_PATH))
return
doris_root_dir = os.path.abspath(__file__)
for i in range(4):
doris_root_dir = os.path.dirname(doris_root_dir)
regression_conf_custom = doris_root_dir + "/regression-test/conf/regression-conf-custom.groovy"
if input("write file {} ?\n y/N: ".format(
regression_conf_custom)) != 'y':
print("No write regression custom file.")
return
with open(regression_conf_custom, "w") as f:
f.write(
content.replace("127.0.0.1",
master_fe_ip[:master_fe_ip.find(':')]))
print("Write succ: " + regression_conf_custom)
class ListCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"ls", help="List running doris compose clusters.")
parser.add_argument(
"NAME",
nargs="*",
help=
"Specify multiple clusters, if specific, show all their containers."
)
self._add_parser_output_json(parser)
parser.add_argument("--detail",
default=False,
action=self._get_parser_bool_action(True),
help="Print more detail fields.")
def _handle_data(self, header, datas):
if utils.is_enable_log():
table = prettytable.PrettyTable(
[utils.render_green(field) for field in header])
for row in datas:
table.add_row(row)
print(table)
return ""
else:
datas.insert(0, header)
return datas
def run(self, args):
COMPOSE_MISSING = "(missing)"
COMPOSE_BAD = "(bad)"
COMPOSE_GOOD = ""
SERVICE_DEAD = "dead"
class ComposeService(object):
def __init__(self, name, ip, image):
self.name = name
self.ip = ip
self.image = image
def parse_cluster_compose_file(cluster_name):
compose_file = CLUSTER.get_compose_file(cluster_name)
if not os.path.exists(compose_file):
return COMPOSE_MISSING, {}
try:
compose = utils.read_compose_file(compose_file)
if not compose:
return COMPOSE_BAD, {}
services = compose.get("services", {})
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
service:
ComposeService(
service,
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
for service, service_conf in services.items()
}
except:
return COMPOSE_BAD, {}
clusters = {}
search_names = []
if args.NAME:
search_names = args.NAME
else:
search_names = CLUSTER.get_all_cluster_names()
for cluster_name in search_names:
status, services = parse_cluster_compose_file(cluster_name)
clusters[cluster_name] = {"status": status, "services": services}
docker_clusters = utils.get_doris_containers(args.NAME)
for cluster_name, containers in docker_clusters.items():
cluster_info = clusters.get(cluster_name, None)
if not cluster_info:
cluster_info = {"status": COMPOSE_MISSING, "services": {}}
clusters[cluster_name] = cluster_info
for container in containers:
#if container.status == "running" and cluster_info[
# "status"] == COMPOSE_GOOD and (
# container.name not in cluster_info["services"]):
# container.status = "orphans"
cluster_info["services"][container.name] = container
TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
if not args.NAME:
header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD",
"CONFIG FILES")
rows = []
for name in sorted(clusters.keys()):
cluster_info = clusters[name]
service_statuses = {}
for _, container in cluster_info["services"].items():
status = SERVICE_DEAD if type(
container) == TYPE_COMPOSESERVICE else container.status
service_statuses[status] = service_statuses.get(status,
0) + 1
show_status = ",".join([
"{}({})".format(status, count)
for status, count in service_statuses.items()
])
owner = utils.get_path_owner(CLUSTER.get_cluster_path(name))
compose_file = CLUSTER.get_compose_file(name)
is_cloud = ""
try:
cluster = CLUSTER.Cluster.load(name)
is_cloud = "true" if cluster.is_cloud else "false"
except:
pass
rows.append((name, owner, show_status,
CLUSTER.get_master_fe_endpoint(name), is_cloud,
"{}{}".format(compose_file,
cluster_info["status"])))
return self._handle_data(header, rows)
header = [
"CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE",
"CREATED", "alive", "is_master", "backend_id", "tablet_num",
"last_heartbeat", "err_msg"
]
if args.detail:
header += [
"query_port",
"http_port",
]
rows = []
for cluster_name in sorted(clusters.keys()):
fe_ids = {}
be_ids = {}
services = clusters[cluster_name]["services"]
db_mgr = database.get_db_mgr(cluster_name, False)
nodes = []
for service_name, container in services.items():
_, node_type, id = utils.parse_service_name(container.name)
node = ListNode()
node.cluster_name = cluster_name
node.node_type = node_type
node.id = id
node.update_db_info(db_mgr)
nodes.append(node)
if node_type == CLUSTER.Node.TYPE_FE:
fe_ids[id] = True
elif node_type == CLUSTER.Node.TYPE_BE:
be_ids[id] = True
if type(container) == TYPE_COMPOSESERVICE:
node.ip = container.ip
node.image = container.image
node.status = SERVICE_DEAD
else:
node.created = dateutil.parser.parse(
container.attrs.get("Created")).astimezone().strftime(
"%Y-%m-%d %H:%M:%S")
node.ip = list(
container.attrs["NetworkSettings"]
["Networks"].values())[0]["IPAMConfig"]["IPv4Address"]
node.image = ",".join(container.image.tags)
node.container_id = container.short_id
node.status = container.status
if node.container_id and \
node_type in (CLUSTER.Node.TYPE_FDB,
CLUSTER.Node.TYPE_MS,
CLUSTER.Node.TYPE_RECYCLE):
node.alive = "true"
for id, fe in db_mgr.fe_states.items():
if fe_ids.get(id, False):
continue
node = ListNode()
node.cluster_name = cluster_name
node.node_type = CLUSTER.Node.TYPE_FE
node.id = id
node.status = SERVICE_DEAD
node.update_db_info(db_mgr)
nodes.append(node)
for id, be in db_mgr.be_states.items():
if be_ids.get(id, False):
continue
node = ListNode()
node.cluster_name = cluster_name
node.node_type = CLUSTER.Node.TYPE_BE
node.id = id
node.status = SERVICE_DEAD
node.update_db_info(db_mgr)
nodes.append(node)
def get_node_seq(node):
return CLUSTER.get_node_seq(node.node_type, node.id)
for node in sorted(nodes, key=get_node_seq):
rows.append(node.info(args.detail))
return self._handle_data(header, rows)
ALL_COMMANDS = [
UpCommand("up"),
DownCommand("down"),
SimpleCommand("start", "Start the doris containers. "),
SimpleCommand("stop", "Stop the doris containers. "),
SimpleCommand("restart", "Restart the doris containers. "),
SimpleCommand("pause", "Pause the doris containers. "),
SimpleCommand("unpause", "Unpause the doris containers. "),
GenConfCommand("config"),
ListCommand("ls"),
]