959 lines
37 KiB
Python
959 lines
37 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import argparse
|
|
import cluster as CLUSTER
|
|
import database
|
|
import dateutil.parser
|
|
import utils
|
|
import os
|
|
import os.path
|
|
import prettytable
|
|
import shutil
|
|
import sys
|
|
import time
|
|
|
|
LOG = utils.get_logger()
|
|
|
|
|
|
# return for_all, related_nodes, related_node_num
|
|
def get_ids_related_nodes(cluster,
|
|
fe_ids,
|
|
be_ids,
|
|
ms_ids,
|
|
recycle_ids,
|
|
fdb_ids,
|
|
ignore_not_exists=False):
|
|
if fe_ids is None and be_ids is None and ms_ids is None and recycle_ids is None and fdb_ids is None:
|
|
return True, None, cluster.get_all_nodes_num()
|
|
|
|
def get_ids_related_nodes_with_type(node_type, ids):
|
|
if ids is None:
|
|
return []
|
|
if not ids:
|
|
return cluster.get_all_nodes(node_type)
|
|
else:
|
|
nodes = []
|
|
for id in ids:
|
|
try:
|
|
nodes.append(cluster.get_node(node_type, id))
|
|
except Exception as e:
|
|
if ignore_not_exists:
|
|
LOG.warning(
|
|
utils.render_yellow(
|
|
"Not found {} with id {}".format(
|
|
node_type, id)))
|
|
else:
|
|
raise e
|
|
return nodes
|
|
|
|
type_ids = [
|
|
(CLUSTER.Node.TYPE_FE, fe_ids),
|
|
(CLUSTER.Node.TYPE_BE, be_ids),
|
|
(CLUSTER.Node.TYPE_MS, ms_ids),
|
|
(CLUSTER.Node.TYPE_RECYCLE, recycle_ids),
|
|
(CLUSTER.Node.TYPE_FDB, fdb_ids),
|
|
]
|
|
|
|
nodes = []
|
|
for node_type, ids in type_ids:
|
|
nodes.extend(get_ids_related_nodes_with_type(node_type, ids))
|
|
|
|
related_node_num = len(nodes)
|
|
|
|
return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)
|
|
|
|
|
|
class Command(object):
|
|
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def print_use_time(self):
|
|
return True
|
|
|
|
def add_parser(self, args_parsers):
|
|
raise Exception("No implemented")
|
|
|
|
def run(self, args):
|
|
raise Exception("No implemented")
|
|
|
|
def _add_parser_output_json(self, parser):
|
|
parser.add_argument("--output-json",
|
|
default=False,
|
|
action=self._get_parser_bool_action(True),
|
|
help="output as json, and don't print log.")
|
|
|
|
def _add_parser_ids_args(self, parser):
|
|
group = parser.add_argument_group("for existing nodes",
|
|
"apply to the existing nodes.")
|
|
group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \
|
|
"if specific --fe-id but not specific ids, apply to all fe. Example: '--fe-id 2 3' will select fe-2 and fe-3.")
|
|
group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \
|
|
"if specific --be-id but not specific ids, apply to all be. Example: '--be-id' will select all backends.")
|
|
group.add_argument(
|
|
"--ms-id",
|
|
nargs="*",
|
|
type=int,
|
|
help=
|
|
"Specify up ms ids, support multiple ids. Only use in cloud cluster."
|
|
)
|
|
group.add_argument(
|
|
"--recycle-id",
|
|
nargs="*",
|
|
type=int,
|
|
help=
|
|
"Specify up recycle ids, support multiple ids. Only use in cloud cluster."
|
|
)
|
|
group.add_argument(
|
|
"--fdb-id",
|
|
nargs="*",
|
|
type=int,
|
|
help=
|
|
"Specify up fdb ids, support multiple ids. Only use in cloud cluster."
|
|
)
|
|
|
|
def _get_parser_bool_action(self, is_store_true):
|
|
if self._support_boolean_action():
|
|
return argparse.BooleanOptionalAction
|
|
else:
|
|
return "store_true" if is_store_true else "store_false"
|
|
|
|
def _support_boolean_action(self):
|
|
return sys.version_info.major == 3 and sys.version_info.minor >= 9
|
|
|
|
|
|
class SimpleCommand(Command):
|
|
|
|
def __init__(self, command, help):
|
|
super().__init__(command)
|
|
self.command = command
|
|
self.help = help
|
|
|
|
def add_parser(self, args_parsers):
|
|
help = self.help + " If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
|
|
"then apply to all containers."
|
|
parser = args_parsers.add_parser(self.command, help=help)
|
|
parser.add_argument("NAME", help="Specify cluster name.")
|
|
self._add_parser_ids_args(parser)
|
|
self._add_parser_output_json(parser)
|
|
|
|
def run(self, args):
|
|
cluster = CLUSTER.Cluster.load(args.NAME)
|
|
_, related_nodes, related_node_num = get_ids_related_nodes(
|
|
cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id,
|
|
args.fdb_id)
|
|
utils.exec_docker_compose_command(cluster.get_compose_file(),
|
|
self.command,
|
|
nodes=related_nodes)
|
|
show_cmd = self.command[0].upper() + self.command[1:]
|
|
LOG.info(
|
|
utils.render_green("{} succ, total related node num {}".format(
|
|
show_cmd, related_node_num)))
|
|
|
|
|
|
class UpCommand(Command):
|
|
|
|
def add_parser(self, args_parsers):
|
|
parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\
|
|
"or add new containers. " \
|
|
"If none of --add-fe-num, --add-be-num, --add-ms-num, --add-recycle-num, "\
|
|
"--fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, " \
|
|
"then apply to all containers.")
|
|
parser.add_argument("NAME", default="", help="Specific cluster name.")
|
|
parser.add_argument("IMAGE",
|
|
default="",
|
|
nargs="?",
|
|
help="Specify docker image.")
|
|
|
|
parser.add_argument(
|
|
"--cloud",
|
|
default=False,
|
|
action=self._get_parser_bool_action(True),
|
|
help=
|
|
"Create cloud cluster, default is false. Only use when creating new cluster."
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--wait-timeout",
|
|
type=int,
|
|
default=0,
|
|
help=
|
|
"Specify wait seconds for fe/be ready for service: 0 not wait (default), "\
|
|
"> 0 max wait seconds, -1 wait unlimited."
|
|
)
|
|
|
|
self._add_parser_output_json(parser)
|
|
|
|
group1 = parser.add_argument_group("add new nodes",
|
|
"add cluster nodes.")
|
|
group1.add_argument(
|
|
"--add-fe-num",
|
|
type=int,
|
|
help=
|
|
"Specify add fe num, default: 3 for a new cluster, 0 for a existing cluster."
|
|
)
|
|
group1.add_argument(
|
|
"--add-be-num",
|
|
type=int,
|
|
help=
|
|
"Specify add be num, default: 3 for a new cluster, 0 for a existing cluster."
|
|
)
|
|
group1.add_argument(
|
|
"--add-ms-num",
|
|
type=int,
|
|
help=
|
|
"Specify add ms num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
|
|
)
|
|
group1.add_argument(
|
|
"--add-recycle-num",
|
|
type=int,
|
|
help=
|
|
"Specify add recycle num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster"
|
|
)
|
|
group1.add_argument("--fe-config",
|
|
nargs="*",
|
|
type=str,
|
|
help="Specify fe configs for fe.conf. "\
|
|
"Example: --fe-config \"enable_debug_points = true\" \"sys_log_level = ERROR\".")
|
|
group1.add_argument("--be-config",
|
|
nargs="*",
|
|
type=str,
|
|
help="Specify be configs for be.conf. "\
|
|
"Example: --be-config \"enable_debug_points = true\" \"enable_auth = true\".")
|
|
group1.add_argument("--ms-config",
|
|
nargs="*",
|
|
type=str,
|
|
help="Specify ms configs for doris_cloud.conf. "\
|
|
"Example: --ms-config \"log_level = warn\".")
|
|
group1.add_argument("--recycle-config",
|
|
nargs="*",
|
|
type=str,
|
|
help="Specify recycle configs for doris_cloud.conf. "\
|
|
"Example: --recycle-config \"log_level = warn\".")
|
|
group1.add_argument("--be-disks",
|
|
nargs="*",
|
|
default=["HDD=1"],
|
|
type=str,
|
|
help="Specify each be disks, each group is \"disk_type=disk_num[,disk_capactity]\", "\
|
|
"disk_type is HDD or SSD, disk_capactity is capactity limit in gb. default: HDD=1. "\
|
|
"Example: --be-disks \"HDD=1\", \"SSD=1,10\", \"SSD=2,100\""\
|
|
"means each be has 1 HDD without capactity limit, 1 SSD with 10GB capactity limit, "\
|
|
"2 SSD with 100GB capactity limit")
|
|
group1.add_argument(
|
|
"--be-cluster",
|
|
type=str,
|
|
help=
|
|
"be cluster name, if not specific, will use compute_cluster. Only use in cloud cluster."
|
|
)
|
|
|
|
self._add_parser_ids_args(parser)
|
|
|
|
group2 = parser.add_mutually_exclusive_group()
|
|
if self._support_boolean_action():
|
|
group2.add_argument(
|
|
"--start",
|
|
default=True,
|
|
action=self._get_parser_bool_action(False),
|
|
help="Start containers, default is true. If specific --no-start, "\
|
|
"will create or update config image only but not start containers.")
|
|
else:
|
|
group2.add_argument(
|
|
"--no-start",
|
|
dest='start',
|
|
default=True,
|
|
action=self._get_parser_bool_action(False),
|
|
help=
|
|
"Create or update config image only and don't start containers."
|
|
)
|
|
group2.add_argument("--force-recreate",
|
|
default=False,
|
|
action=self._get_parser_bool_action(True),
|
|
help="Recreate containers even if their configuration" \
|
|
"and image haven't changed. ")
|
|
|
|
parser.add_argument("--coverage-dir",
|
|
default="",
|
|
help="code coverage output directory")
|
|
|
|
parser.add_argument(
|
|
"--fdb-version",
|
|
type=str,
|
|
default="7.1.26",
|
|
help="fdb image version. Only use in cloud cluster.")
|
|
|
|
def run(self, args):
|
|
if not args.NAME:
|
|
raise Exception("Need specific not empty cluster name")
|
|
for_all = True
|
|
add_fdb_num = 0
|
|
try:
|
|
cluster = CLUSTER.Cluster.load(args.NAME)
|
|
|
|
if not cluster.is_cloud:
|
|
args.add_ms_num = None
|
|
args.add_recycle_num = None
|
|
args.ms_id = None
|
|
args.recycle_id = None
|
|
args.fdb_id = None
|
|
|
|
if args.fe_id != None or args.be_id != None \
|
|
or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \
|
|
or args.add_fe_num or args.add_be_num \
|
|
or args.add_ms_num or args.add_recycle_num:
|
|
for_all = False
|
|
except:
|
|
# a new cluster
|
|
if not args.IMAGE:
|
|
raise Exception("New cluster must specific image") from None
|
|
if args.fe_id != None:
|
|
args.fe_id = None
|
|
LOG.warning(
|
|
utils.render_yellow("Ignore --fe-id for new cluster"))
|
|
if args.be_id != None:
|
|
args.be_id = None
|
|
LOG.warning(
|
|
utils.render_yellow("Ignore --be-id for new cluster"))
|
|
|
|
args.fdb_id = None
|
|
args.ms_id = None
|
|
args.recycle_id = None
|
|
|
|
if args.add_fe_num is None:
|
|
args.add_fe_num = 3
|
|
if args.add_be_num is None:
|
|
args.add_be_num = 3
|
|
|
|
cloud_store_config = {}
|
|
if args.cloud:
|
|
add_fdb_num = 1
|
|
if not args.add_ms_num:
|
|
args.add_ms_num = 1
|
|
if not args.add_recycle_num:
|
|
args.add_recycle_num = 1
|
|
if not args.be_cluster:
|
|
args.be_cluster = "compute_cluster"
|
|
cloud_store_config = self._get_cloud_store_config()
|
|
else:
|
|
args.add_ms_num = 0
|
|
args.add_recycle_num = 0
|
|
|
|
cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud,
|
|
args.fe_config, args.be_config,
|
|
args.ms_config, args.recycle_config,
|
|
args.be_disks, args.be_cluster,
|
|
args.coverage_dir,
|
|
cloud_store_config)
|
|
LOG.info("Create new cluster {} succ, cluster path is {}".format(
|
|
args.NAME, cluster.get_path()))
|
|
|
|
if args.be_cluster and cluster.is_cloud:
|
|
cluster.be_cluster = args.be_cluster
|
|
|
|
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
|
|
args.be_id, args.ms_id,
|
|
args.recycle_id,
|
|
args.fdb_id)
|
|
add_fe_ids = []
|
|
add_be_ids = []
|
|
add_ms_ids = []
|
|
add_recycle_ids = []
|
|
add_fdb_ids = []
|
|
|
|
add_type_nums = [
|
|
(CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids),
|
|
(CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids),
|
|
(CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids),
|
|
(CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids),
|
|
(CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids),
|
|
]
|
|
|
|
if not related_nodes:
|
|
related_nodes = []
|
|
|
|
def do_add_node(node_type, add_num, add_ids):
|
|
if not add_num:
|
|
return
|
|
for i in range(add_num):
|
|
node = cluster.add(node_type)
|
|
related_nodes.append(node)
|
|
add_ids.append(node.id)
|
|
|
|
for node_type, add_num, add_ids in add_type_nums:
|
|
do_add_node(node_type, add_num, add_ids)
|
|
|
|
if args.IMAGE:
|
|
for node in related_nodes:
|
|
node.set_image(args.IMAGE)
|
|
if for_all:
|
|
cluster.set_image(args.IMAGE)
|
|
|
|
for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB):
|
|
node.set_image("foundationdb/foundationdb:{}".format(
|
|
args.fdb_version))
|
|
|
|
cluster.save()
|
|
|
|
options = []
|
|
if not args.start:
|
|
options.append("--no-start")
|
|
else:
|
|
options = ["-d", "--remove-orphans"]
|
|
if args.force_recreate:
|
|
options.append("--force-recreate")
|
|
|
|
related_node_num = len(related_nodes)
|
|
if for_all:
|
|
related_node_num = cluster.get_all_nodes_num()
|
|
related_nodes = None
|
|
|
|
utils.exec_docker_compose_command(cluster.get_compose_file(), "up",
|
|
options, related_nodes)
|
|
|
|
ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name
|
|
LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n")
|
|
LOG.info(
|
|
"Master fe query address: " +
|
|
utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) +
|
|
"\n")
|
|
|
|
if not args.start:
|
|
LOG.info(
|
|
utils.render_green(
|
|
"Not up cluster cause specific --no-start, related node num {}"
|
|
.format(related_node_num)))
|
|
else:
|
|
if args.wait_timeout != 0:
|
|
if args.wait_timeout == -1:
|
|
args.wait_timeout = 1000000000
|
|
expire_ts = time.time() + args.wait_timeout
|
|
while True:
|
|
db_mgr = database.get_db_mgr(args.NAME, False)
|
|
dead_frontends = []
|
|
for id in add_fe_ids:
|
|
fe_state = db_mgr.get_fe(id)
|
|
if not fe_state or not fe_state.alive:
|
|
dead_frontends.append(id)
|
|
dead_backends = []
|
|
for id in add_be_ids:
|
|
be_state = db_mgr.get_be(id)
|
|
if not be_state or not be_state.alive:
|
|
dead_backends.append(id)
|
|
if not dead_frontends and not dead_backends:
|
|
break
|
|
if time.time() >= expire_ts:
|
|
err = ""
|
|
if dead_frontends:
|
|
err += "dead fe: " + str(dead_frontends) + ". "
|
|
if dead_backends:
|
|
err += "dead be: " + str(dead_backends) + ". "
|
|
raise Exception(err)
|
|
time.sleep(1)
|
|
LOG.info(
|
|
utils.render_green(
|
|
"Up cluster {} succ, related node num {}".format(
|
|
args.NAME, related_node_num)))
|
|
|
|
return {
|
|
"fe": {
|
|
"add_list": add_fe_ids,
|
|
},
|
|
"be": {
|
|
"add_list": add_be_ids,
|
|
},
|
|
"ms": {
|
|
"add_list": add_ms_ids,
|
|
},
|
|
"recycle": {
|
|
"add_list": add_recycle_ids,
|
|
},
|
|
"fdb": {
|
|
"add_list": add_fdb_ids,
|
|
},
|
|
}
|
|
|
|
def _get_cloud_store_config(self):
|
|
example_cfg_file = os.path.join(CLUSTER.LOCAL_RESOURCE_PATH,
|
|
"cloud.ini.example")
|
|
if not CLUSTER.CLOUD_CFG_FILE:
|
|
raise Exception("Cloud cluster need S3 store, specific its config in a file.\n" \
|
|
"A example file is " + example_cfg_file + ".\n" \
|
|
"Then setting the env variable `export DORIS_CLOUD_CFG_FILE=<cfg-file-path>`.")
|
|
|
|
if not os.path.exists(CLUSTER.CLOUD_CFG_FILE):
|
|
raise Exception("Cloud store config file '" +
|
|
CLUSTER.CLOUD_CFG_FILE + "' not exists.")
|
|
|
|
config = {}
|
|
with open(example_cfg_file, "r") as f:
|
|
for line in f.readlines():
|
|
if line.startswith('#'):
|
|
continue
|
|
pos = line.find('=')
|
|
if pos <= 0:
|
|
continue
|
|
key = line[0:pos].strip()
|
|
if key:
|
|
config[key] = ""
|
|
|
|
with open(CLUSTER.CLOUD_CFG_FILE, "r") as f:
|
|
for line in f.readlines():
|
|
if line.startswith('#'):
|
|
continue
|
|
pos = line.find('=')
|
|
if pos <= 0:
|
|
continue
|
|
key = line[0:pos].strip()
|
|
if key and config.get(key, None) != None:
|
|
config[key] = line[line.find('=') + 1:].strip()
|
|
|
|
for key, value in config.items():
|
|
if not value:
|
|
raise Exception(
|
|
"Should provide none empty property '{}' in file {}".
|
|
format(key, CLUSTER.CLOUD_CFG_FILE))
|
|
return config
|
|
|
|
|
|
class DownCommand(Command):
|
|
|
|
def add_parser(self, args_parsers):
|
|
parser = args_parsers.add_parser("down",
|
|
help="Down doris containers, networks. "\
|
|
"It will also remove node from DB. " \
|
|
"If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\
|
|
"then apply to all containers.")
|
|
parser.add_argument("NAME", help="Specify cluster name")
|
|
self._add_parser_ids_args(parser)
|
|
self._add_parser_output_json(parser)
|
|
parser.add_argument(
|
|
"--clean",
|
|
default=False,
|
|
action=self._get_parser_bool_action(True),
|
|
help=
|
|
"Clean container related files, include expose data, config and logs."
|
|
)
|
|
parser.add_argument(
|
|
"--drop-force",
|
|
default=None,
|
|
action=self._get_parser_bool_action(True),
|
|
help="Drop doris node force. For be, if specific --drop-force, "\
|
|
"it will send dropp to fe, otherwise send decommission to fe.")
|
|
|
|
def run(self, args):
|
|
cluster = None
|
|
try:
|
|
cluster = CLUSTER.Cluster.load(args.NAME)
|
|
except:
|
|
return "Cluster not exists or load failed"
|
|
for_all, related_nodes, related_node_num = get_ids_related_nodes(
|
|
cluster,
|
|
args.fe_id,
|
|
args.be_id,
|
|
args.ms_id,
|
|
args.recycle_id,
|
|
args.fdb_id,
|
|
ignore_not_exists=True)
|
|
|
|
if for_all:
|
|
if os.path.exists(cluster.get_compose_file()):
|
|
try:
|
|
utils.exec_docker_compose_command(
|
|
cluster.get_compose_file(), "down",
|
|
["-v", "--remove-orphans"])
|
|
except Exception as e:
|
|
LOG.warn("down cluster has exception: " + str(e))
|
|
try:
|
|
utils.remove_docker_network(cluster.name)
|
|
except Exception as e:
|
|
LOG.warn("remove network has exception: " + str(e))
|
|
if args.clean:
|
|
utils.enable_dir_with_rw_perm(cluster.get_path())
|
|
shutil.rmtree(cluster.get_path())
|
|
LOG.info(
|
|
utils.render_yellow(
|
|
"Clean cluster data cause has specific --clean"))
|
|
else:
|
|
db_mgr = database.get_db_mgr(cluster.name)
|
|
|
|
for node in related_nodes:
|
|
if node.is_fe():
|
|
fe_endpoint = "{}:{}".format(node.get_ip(),
|
|
CLUSTER.FE_EDITLOG_PORT)
|
|
db_mgr.drop_fe(fe_endpoint)
|
|
elif node.is_be():
|
|
be_endpoint = "{}:{}".format(node.get_ip(),
|
|
CLUSTER.BE_HEARTBEAT_PORT)
|
|
if args.drop_force:
|
|
db_mgr.drop_be(be_endpoint)
|
|
else:
|
|
db_mgr.decommission_be(be_endpoint)
|
|
else:
|
|
raise Exception("Unknown node type: {}".format(
|
|
node.node_type()))
|
|
|
|
#utils.exec_docker_compose_command(cluster.get_compose_file(),
|
|
# "stop",
|
|
# nodes=[node])
|
|
utils.exec_docker_compose_command(cluster.get_compose_file(),
|
|
"rm", ["-s", "-v", "-f"],
|
|
nodes=[node])
|
|
if args.clean:
|
|
utils.enable_dir_with_rw_perm(node.get_path())
|
|
shutil.rmtree(node.get_path())
|
|
register_file = "{}/{}-{}-register".format(
|
|
CLUSTER.get_status_path(cluster.name),
|
|
node.node_type(), node.get_ip())
|
|
if os.path.exists(register_file):
|
|
os.remove(register_file)
|
|
LOG.info(
|
|
utils.render_yellow(
|
|
"Clean {} with id {} data cause has specific --clean"
|
|
.format(node.node_type(), node.id)))
|
|
|
|
cluster.remove(node.node_type(), node.id)
|
|
cluster.save()
|
|
|
|
LOG.info(
|
|
utils.render_green(
|
|
"Down cluster {} succ, related node num {}".format(
|
|
args.NAME, related_node_num)))
|
|
|
|
return "down cluster succ"
|
|
|
|
|
|
class ListNode(object):
|
|
|
|
def __init__(self):
|
|
self.node_type = ""
|
|
self.id = 0
|
|
self.backend_id = ""
|
|
self.cluster_name = ""
|
|
self.ip = ""
|
|
self.status = ""
|
|
self.container_id = ""
|
|
self.image = ""
|
|
self.created = ""
|
|
self.alive = ""
|
|
self.is_master = ""
|
|
self.query_port = ""
|
|
self.tablet_num = ""
|
|
self.last_heartbeat = ""
|
|
self.err_msg = ""
|
|
|
|
def info(self, detail):
|
|
result = [
|
|
self.cluster_name, "{}-{}".format(self.node_type, self.id),
|
|
self.ip, self.status, self.container_id, self.image, self.created,
|
|
self.alive, self.is_master, self.backend_id, self.tablet_num,
|
|
self.last_heartbeat, self.err_msg
|
|
]
|
|
if detail:
|
|
query_port = ""
|
|
http_port = ""
|
|
if self.node_type == CLUSTER.Node.TYPE_FE:
|
|
query_port = CLUSTER.FE_QUERY_PORT
|
|
http_port = CLUSTER.FE_HTTP_PORT
|
|
elif self.node_type == CLUSTER.Node.TYPE_BE:
|
|
http_port = CLUSTER.BE_WEBSVR_PORT
|
|
else:
|
|
pass
|
|
result += [
|
|
query_port,
|
|
http_port,
|
|
]
|
|
return result
|
|
|
|
def update_db_info(self, db_mgr):
|
|
if self.node_type == CLUSTER.Node.TYPE_FE:
|
|
fe = db_mgr.get_fe(self.id)
|
|
if fe:
|
|
self.alive = str(fe.alive).lower()
|
|
self.is_master = str(fe.is_master).lower()
|
|
self.query_port = fe.query_port
|
|
self.last_heartbeat = fe.last_heartbeat
|
|
self.err_msg = fe.err_msg
|
|
elif self.node_type == CLUSTER.Node.TYPE_BE:
|
|
self.backend_id = -1
|
|
be = db_mgr.get_be(self.id)
|
|
if be:
|
|
self.alive = str(be.alive).lower()
|
|
self.backend_id = be.backend_id
|
|
self.tablet_num = be.tablet_num
|
|
self.last_heartbeat = be.last_heartbeat
|
|
self.err_msg = be.err_msg
|
|
|
|
|
|
class GenConfCommand(Command):
|
|
|
|
def print_use_time(self):
|
|
return False
|
|
|
|
def add_parser(self, args_parsers):
|
|
parser = args_parsers.add_parser(
|
|
"config",
|
|
help="Generate regression-conf-custom.groovy for regression test.")
|
|
parser.add_argument("NAME", default="", help="Specific cluster name.")
|
|
|
|
return parser
|
|
|
|
def run(self, args):
|
|
content = '''
|
|
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
|
|
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
|
|
feSourceThriftAddress = "127.0.0.1:9020"
|
|
feTargetThriftAddress = "127.0.0.1:9020"
|
|
syncerAddress = "127.0.0.1:9190"
|
|
feHttpAddress = "127.0.0.1:8030"
|
|
'''
|
|
master_fe_ip = CLUSTER.get_master_fe_endpoint(args.NAME)
|
|
if not master_fe_ip:
|
|
print("Not found cluster with name {} in directory {}".format(
|
|
args.NAME, CLUSTER.LOCAL_DORIS_PATH))
|
|
return
|
|
doris_root_dir = os.path.abspath(__file__)
|
|
for i in range(4):
|
|
doris_root_dir = os.path.dirname(doris_root_dir)
|
|
regression_conf_custom = doris_root_dir + "/regression-test/conf/regression-conf-custom.groovy"
|
|
if input("write file {} ?\n y/N: ".format(
|
|
regression_conf_custom)) != 'y':
|
|
print("No write regression custom file.")
|
|
return
|
|
with open(regression_conf_custom, "w") as f:
|
|
f.write(
|
|
content.replace("127.0.0.1",
|
|
master_fe_ip[:master_fe_ip.find(':')]))
|
|
print("Write succ: " + regression_conf_custom)
|
|
|
|
|
|
class ListCommand(Command):
|
|
|
|
def add_parser(self, args_parsers):
|
|
parser = args_parsers.add_parser(
|
|
"ls", help="List running doris compose clusters.")
|
|
parser.add_argument(
|
|
"NAME",
|
|
nargs="*",
|
|
help=
|
|
"Specify multiple clusters, if specific, show all their containers."
|
|
)
|
|
self._add_parser_output_json(parser)
|
|
parser.add_argument("--detail",
|
|
default=False,
|
|
action=self._get_parser_bool_action(True),
|
|
help="Print more detail fields.")
|
|
|
|
def _handle_data(self, header, datas):
|
|
if utils.is_enable_log():
|
|
table = prettytable.PrettyTable(
|
|
[utils.render_green(field) for field in header])
|
|
for row in datas:
|
|
table.add_row(row)
|
|
print(table)
|
|
return ""
|
|
else:
|
|
datas.insert(0, header)
|
|
return datas
|
|
|
|
def run(self, args):
|
|
COMPOSE_MISSING = "(missing)"
|
|
COMPOSE_BAD = "(bad)"
|
|
COMPOSE_GOOD = ""
|
|
|
|
SERVICE_DEAD = "dead"
|
|
|
|
class ComposeService(object):
|
|
|
|
def __init__(self, name, ip, image):
|
|
self.name = name
|
|
self.ip = ip
|
|
self.image = image
|
|
|
|
def parse_cluster_compose_file(cluster_name):
|
|
compose_file = CLUSTER.get_compose_file(cluster_name)
|
|
if not os.path.exists(compose_file):
|
|
return COMPOSE_MISSING, {}
|
|
try:
|
|
compose = utils.read_compose_file(compose_file)
|
|
if not compose:
|
|
return COMPOSE_BAD, {}
|
|
services = compose.get("services", {})
|
|
if services is None:
|
|
return COMPOSE_BAD, {}
|
|
return COMPOSE_GOOD, {
|
|
service:
|
|
ComposeService(
|
|
service,
|
|
list(service_conf["networks"].values())[0]
|
|
["ipv4_address"], service_conf["image"])
|
|
for service, service_conf in services.items()
|
|
}
|
|
except:
|
|
return COMPOSE_BAD, {}
|
|
|
|
clusters = {}
|
|
search_names = []
|
|
if args.NAME:
|
|
search_names = args.NAME
|
|
else:
|
|
search_names = CLUSTER.get_all_cluster_names()
|
|
|
|
for cluster_name in search_names:
|
|
status, services = parse_cluster_compose_file(cluster_name)
|
|
clusters[cluster_name] = {"status": status, "services": services}
|
|
|
|
docker_clusters = utils.get_doris_containers(args.NAME)
|
|
for cluster_name, containers in docker_clusters.items():
|
|
cluster_info = clusters.get(cluster_name, None)
|
|
if not cluster_info:
|
|
cluster_info = {"status": COMPOSE_MISSING, "services": {}}
|
|
clusters[cluster_name] = cluster_info
|
|
for container in containers:
|
|
#if container.status == "running" and cluster_info[
|
|
# "status"] == COMPOSE_GOOD and (
|
|
# container.name not in cluster_info["services"]):
|
|
# container.status = "orphans"
|
|
cluster_info["services"][container.name] = container
|
|
|
|
TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
|
|
if not args.NAME:
|
|
header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD",
|
|
"CONFIG FILES")
|
|
rows = []
|
|
for name in sorted(clusters.keys()):
|
|
cluster_info = clusters[name]
|
|
service_statuses = {}
|
|
for _, container in cluster_info["services"].items():
|
|
status = SERVICE_DEAD if type(
|
|
container) == TYPE_COMPOSESERVICE else container.status
|
|
service_statuses[status] = service_statuses.get(status,
|
|
0) + 1
|
|
show_status = ",".join([
|
|
"{}({})".format(status, count)
|
|
for status, count in service_statuses.items()
|
|
])
|
|
owner = utils.get_path_owner(CLUSTER.get_cluster_path(name))
|
|
compose_file = CLUSTER.get_compose_file(name)
|
|
|
|
is_cloud = ""
|
|
try:
|
|
cluster = CLUSTER.Cluster.load(name)
|
|
is_cloud = "true" if cluster.is_cloud else "false"
|
|
except:
|
|
pass
|
|
|
|
rows.append((name, owner, show_status,
|
|
CLUSTER.get_master_fe_endpoint(name), is_cloud,
|
|
"{}{}".format(compose_file,
|
|
cluster_info["status"])))
|
|
return self._handle_data(header, rows)
|
|
|
|
header = [
|
|
"CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE",
|
|
"CREATED", "alive", "is_master", "backend_id", "tablet_num",
|
|
"last_heartbeat", "err_msg"
|
|
]
|
|
if args.detail:
|
|
header += [
|
|
"query_port",
|
|
"http_port",
|
|
]
|
|
|
|
rows = []
|
|
for cluster_name in sorted(clusters.keys()):
|
|
fe_ids = {}
|
|
be_ids = {}
|
|
services = clusters[cluster_name]["services"]
|
|
db_mgr = database.get_db_mgr(cluster_name, False)
|
|
|
|
nodes = []
|
|
for service_name, container in services.items():
|
|
_, node_type, id = utils.parse_service_name(container.name)
|
|
node = ListNode()
|
|
node.cluster_name = cluster_name
|
|
node.node_type = node_type
|
|
node.id = id
|
|
node.update_db_info(db_mgr)
|
|
nodes.append(node)
|
|
|
|
if node_type == CLUSTER.Node.TYPE_FE:
|
|
fe_ids[id] = True
|
|
elif node_type == CLUSTER.Node.TYPE_BE:
|
|
be_ids[id] = True
|
|
|
|
if type(container) == TYPE_COMPOSESERVICE:
|
|
node.ip = container.ip
|
|
node.image = container.image
|
|
node.status = SERVICE_DEAD
|
|
else:
|
|
node.created = dateutil.parser.parse(
|
|
container.attrs.get("Created")).astimezone().strftime(
|
|
"%Y-%m-%d %H:%M:%S")
|
|
node.ip = list(
|
|
container.attrs["NetworkSettings"]
|
|
["Networks"].values())[0]["IPAMConfig"]["IPv4Address"]
|
|
node.image = ",".join(container.image.tags)
|
|
node.container_id = container.short_id
|
|
node.status = container.status
|
|
if node.container_id and \
|
|
node_type in (CLUSTER.Node.TYPE_FDB,
|
|
CLUSTER.Node.TYPE_MS,
|
|
CLUSTER.Node.TYPE_RECYCLE):
|
|
node.alive = "true"
|
|
|
|
for id, fe in db_mgr.fe_states.items():
|
|
if fe_ids.get(id, False):
|
|
continue
|
|
node = ListNode()
|
|
node.cluster_name = cluster_name
|
|
node.node_type = CLUSTER.Node.TYPE_FE
|
|
node.id = id
|
|
node.status = SERVICE_DEAD
|
|
node.update_db_info(db_mgr)
|
|
nodes.append(node)
|
|
for id, be in db_mgr.be_states.items():
|
|
if be_ids.get(id, False):
|
|
continue
|
|
node = ListNode()
|
|
node.cluster_name = cluster_name
|
|
node.node_type = CLUSTER.Node.TYPE_BE
|
|
node.id = id
|
|
node.status = SERVICE_DEAD
|
|
node.update_db_info(db_mgr)
|
|
nodes.append(node)
|
|
|
|
def get_node_seq(node):
|
|
return CLUSTER.get_node_seq(node.node_type, node.id)
|
|
|
|
for node in sorted(nodes, key=get_node_seq):
|
|
rows.append(node.info(args.detail))
|
|
|
|
return self._handle_data(header, rows)
|
|
|
|
|
|
ALL_COMMANDS = [
|
|
UpCommand("up"),
|
|
DownCommand("down"),
|
|
SimpleCommand("start", "Start the doris containers. "),
|
|
SimpleCommand("stop", "Stop the doris containers. "),
|
|
SimpleCommand("restart", "Restart the doris containers. "),
|
|
SimpleCommand("pause", "Pause the doris containers. "),
|
|
SimpleCommand("unpause", "Unpause the doris containers. "),
|
|
GenConfCommand("config"),
|
|
ListCommand("ls"),
|
|
]
|