[Feature](doris compose) A tool for setup and manage doris docker cluster scaling easily (#21649)

This commit is contained in:
yujun
2023-07-12 22:13:38 +08:00
committed by GitHub
parent 00c48f7d46
commit 2e3d15b552
11 changed files with 1831 additions and 0 deletions

View File

@ -0,0 +1,30 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# choose a base image
FROM openjdk:8u342-jdk
# set environment variables
ENV JAVA_HOME="/usr/local/openjdk-8/"
ADD output /opt/apache-doris/
RUN apt-get update && \
apt-get install -y default-mysql-client python && \
apt-get clean

View File

@ -0,0 +1,102 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Doris compose
Use doris compose to create doris docker compose clusters.
## Requirements
1. The doris image should contains:
```
/opt/apache-doris/{fe, be}
```
if build doris use `sh build.sh`, then its output satisfy with this, then run command in doris root
```
docker build -f docker/runtime/doris-compose/Dockerfile -t <image> .
```
will generate a image.
2. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt'
```
python -m pip install --user -r docker/runtime/doris-compose/requirements.txt
```
## Usage
### Create a cluster or recreate its containers
```
python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image?>
--add-fe-num <add-fe-num> --add-be-num <add-be-num>
--fe-id <fd-id> --be-id <be-id>
```
if it's a new cluster, must specific the image.
add fe/be nodes with the specific image, or update existing nodes with `--fe-id`, `--be-id`
### Remove node from the cluster
```
python docker/runtime/doris-compose/doris-compose.py down <cluster-name> --fe-id <fe-id> --be-id<be-id> [--clean] [--drop-force]
```
Down the containers and remove it from the DB.
For BE, if specific drop force, it will send dropp sql to FE, otherwise it will send decommission sql to FE.
If specific `--clean`, it will delete its data too.
### Start, stop, restart specific nodes
```
python docker/runtime/doris-compose/doris-compose.py start <cluster-name> --fe-id <multiple fe ids> --be-id <multiple be ids>
python docker/runtime/doris-compose/doris-compose.py restart <cluster-name> --fe-id <multiple fe ids> --be-id <multiple be ids>
```
### List doris cluster
```
python docker/runtime/doris-compose/doris-compose.py ls <-a> <multiple cluster names>
```
if specific cluster names, it will list all the cluster's nodes.
Otherwise it will just list summary of each clusters. If not specific -a, it will list only active clusters.
If specific `-a`, it will list the unactive clusters too.
There are more options about doris-compose. Just try
```
python docker/runtime/doris-compose/doris-compose.py <command> -h
```

View File

@ -0,0 +1,438 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import jsonpickle
import os
import os.path
import utils
DOCKER_DORIS_PATH = "/opt/apache-doris"
LOCAL_DORIS_PATH = os.getenv("LOCAL_DORIS_PATH")
if not LOCAL_DORIS_PATH:
LOCAL_DORIS_PATH = "/tmp/doris"
LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"resource")
DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource")
FE_HTTP_PORT = 8030
FE_RPC_PORT = 9020
FE_QUERY_PORT = 9030
FE_EDITLOG_PORT = 9010
BE_PORT = 9060
BE_WEBSVR_PORT = 8040
BE_HEARTBEAT_PORT = 9050
BE_BRPC_PORT = 8060
ID_LIMIT = 10000
IP_PART4_SIZE = 200
LOG = utils.get_logger()
def get_cluster_path(cluster_name):
return os.path.join(LOCAL_DORIS_PATH, cluster_name)
def get_compose_file(cluster_name):
return os.path.join(get_cluster_path(cluster_name), "docker-compose.yml")
def get_status_path(cluster_name):
return os.path.join(get_cluster_path(cluster_name), "status")
def gen_subnet_prefix16():
used_subnet = utils.get_docker_subnets_prefix16()
if os.path.exists(LOCAL_DORIS_PATH):
for cluster_name in os.listdir(LOCAL_DORIS_PATH):
try:
cluster = Cluster.load(cluster_name)
used_subnet[cluster.subnet] = True
except:
pass
for i in range(128, 192):
for j in range(256):
subnet = "{}.{}".format(i, j)
if not used_subnet.get(subnet, False):
return subnet
raise Exception("Failed to gen subnet")
class NodeMeta(object):
def __init__(self, image):
self.image = image
class Group(object):
def __init__(self, node_type):
self.node_type = node_type
self.nodes = {} # id : NodeMeta
self.next_id = 1
def add(self, id, image):
assert image
if not id:
id = self.next_id
self.next_id += 1
if self.get_node(id):
raise Exception(
"Failed to add {} with id {}, id has exists".format(
self.node_type, id))
if id > ID_LIMIT:
raise Exception(
"Failed to add {} with id {}, id exceeds {}".format(
self.node_type, id, ID_LIMIT))
self.nodes[id] = NodeMeta(image)
return id
def remove(self, id):
self.nodes.pop(id, None)
def get_node_num(self):
return len(self.nodes)
def get_all_nodes(self):
return self.nodes
def get_node(self, id):
return self.nodes.get(id, None)
def on_loaded(self):
nodes = {}
for id, node in self.nodes.items():
nodes[int(id)] = node
self.nodes = nodes
class Node(object):
TYPE_FE = "fe"
TYPE_BE = "be"
TYPE_ALL = [TYPE_FE, TYPE_BE]
def __init__(self, cluster_name, id, subnet, meta):
self.cluster_name = cluster_name
self.id = id
self.subnet = subnet
self.meta = meta
@staticmethod
def new(cluster_name, node_type, id, subnet, meta):
if node_type == Node.TYPE_FE:
return FE(cluster_name, id, subnet, meta)
elif node_type == Node.TYPE_BE:
return BE(cluster_name, id, subnet, meta)
else:
raise Exception("Unknown node type {}".format(node_type))
def init_dir(self):
path = self.get_path()
os.makedirs(path, exist_ok=True)
# copy config to local
conf_dir = os.path.join(path, "conf")
if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir):
utils.copy_image_directory(
self.get_image(), "{}/{}/conf".format(DOCKER_DORIS_PATH,
self.node_type()),
conf_dir)
assert not utils.is_dir_empty(conf_dir), "conf directory {} is empty, " \
"check doris path in image is correct".format(conf_dir)
for sub_dir in self.expose_sub_dirs():
os.makedirs(os.path.join(path, sub_dir), exist_ok=True)
def is_fe(self):
return self.node_type() == Node.TYPE_FE
def is_be(self):
return self.node_type() == Node.TYPE_BE
def node_type(self):
raise Exception("No implemented")
def expose_sub_dirs(self):
return ["conf", "log"]
def get_name(self):
return "{}-{}".format(self.node_type(), self.id)
def get_path(self):
return os.path.join(get_cluster_path(self.cluster_name),
self.get_name())
def get_image(self):
return self.meta.image
def set_image(self, image):
self.meta.image = image
def get_ip(self):
seq = self.id
seq += IP_PART4_SIZE
if self.node_type() == Node.TYPE_FE:
seq += 0 * ID_LIMIT
elif self.node_type() == Node.TYPE_BE:
seq += 1 * ID_LIMIT
else:
seq += 2 * ID_LIMIT
return "{}.{}.{}".format(self.subnet, int(seq / IP_PART4_SIZE),
seq % IP_PART4_SIZE)
@staticmethod
def get_id_from_ip(ip):
pos2 = ip.rfind(".")
pos1 = ip.rfind(".", 0, pos2 - 1)
num3 = int(ip[pos1 + 1:pos2])
num4 = int(ip[pos2 + 1:])
seq = num3 * IP_PART4_SIZE + num4
while seq > ID_LIMIT:
seq -= ID_LIMIT
seq -= IP_PART4_SIZE
return seq
def service_name(self):
return utils.with_doris_prefix("{}-{}".format(self.cluster_name,
self.get_name()))
def docker_env(self):
return {
"MY_IP": self.get_ip(),
"MY_ID": self.id,
"FE_QUERY_PORT": FE_QUERY_PORT,
"FE_EDITLOG_PORT": FE_EDITLOG_PORT,
"BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT,
"DORIS_HOME": os.path.join(DOCKER_DORIS_PATH, self.node_type()),
}
def docker_ports(self):
raise Exception("No implemented")
def compose(self):
return {
"cap_add": ["SYS_PTRACE"],
"hostname":
self.get_name(),
"container_name":
self.service_name(),
"command":
self.docker_command(),
"environment":
self.docker_env(),
"image":
self.get_image(),
"networks": {
utils.with_doris_prefix(self.cluster_name): {
"ipv4_address": self.get_ip(),
}
},
"ports":
self.docker_ports(),
"ulimits": {
"core": -1
},
"security_opt": ["seccomp:unconfined"],
"volumes": [
"{}:{}/{}/{}".format(os.path.join(self.get_path(),
sub_dir), DOCKER_DORIS_PATH,
self.node_type(), sub_dir)
for sub_dir in self.expose_sub_dirs()
] + [
"{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH),
"{}:{}/{}/status".format(get_status_path(self.cluster_name),
DOCKER_DORIS_PATH, self.node_type()),
] + [
"{0}:{0}:ro".format(path)
for path in ("/etc/localtime", "/etc/timezone",
"/usr/share/zoneinfo") if os.path.exists(path)
],
}
class FE(Node):
def docker_command(self):
return [
"bash",
os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh"),
#"{}/fe/bin/init_fe.sh".format(DOCKER_DORIS_PATH),
]
def docker_ports(self):
return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT]
def node_type(self):
return Node.TYPE_FE
def expose_sub_dirs(self):
return super().expose_sub_dirs() + ["doris-meta"]
class BE(Node):
def docker_command(self):
return [
"bash",
os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh"),
#"{}/be/bin/init_be.sh".format(DOCKER_DORIS_PATH),
]
def docker_ports(self):
return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT]
def node_type(self):
return Node.TYPE_BE
def expose_sub_dirs(self):
return super().expose_sub_dirs() + ["storage"]
class Cluster(object):
def __init__(self, name, subnet, image):
self.name = name
self.subnet = subnet
self.image = image
self.groups = {
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
}
@staticmethod
def new(name, image):
subnet = gen_subnet_prefix16()
cluster = Cluster(name, subnet, image)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
return cluster
@staticmethod
def load(name):
if not name:
raise Exception("Failed to load cluster, name is empty")
path = get_cluster_path(name)
if not os.path.exists(path):
raise Exception(
"Failed to load cluster, its directory {} not exists.".format(
path))
meta_path = Cluster._get_meta_file(name)
if not os.path.exists(meta_path):
raise Exception(
"Failed to load cluster, its meta file {} not exists.".format(
meta_path))
with open(meta_path, "r") as f:
cluster = jsonpickle.loads(f.read())
for group in cluster.groups.values():
group.on_loaded()
return cluster
@staticmethod
def _get_meta_file(name):
return os.path.join(get_cluster_path(name), "meta")
def get_image(self):
return self.image
# cluster's nodes will update image too if cluster update.
def set_image(self, image):
self.image = image
for _, group in self.groups.items():
for _, node_meta in group.nodes.items():
node_meta.image = image
def get_path(self):
return get_cluster_path(self.name)
def get_group(self, node_type):
group = self.groups.get(node_type, None)
if not group:
raise Exception("Unknown node_type: {}".format(node_type))
return group
def get_node(self, node_type, id):
group = self.get_group(node_type)
meta = group.get_node(id)
if not meta:
raise Exception("No found {} with id {}".format(node_type, id))
return Node.new(self.name, node_type, id, self.subnet, meta)
def get_all_nodes(self, node_type):
group = self.groups.get(node_type, None)
if not group:
raise Exception("Unknown node_type: {}".format(node_type))
return [
Node.new(self.name, node_type, id, self.subnet, meta)
for id, meta in group.get_all_nodes().items()
]
def get_all_nodes_num(self):
num = 0
for group in self.groups.values():
num += group.get_node_num()
return num
def add(self, node_type, id=None):
id = self.get_group(node_type).add(id, self.image)
node = self.get_node(node_type, id)
node.init_dir()
return node
def remove(self, node_type, id):
group = self.get_group(node_type)
group.remove(id)
def save(self):
self._save_meta()
self._save_compose()
def _save_meta(self):
with open(Cluster._get_meta_file(self.name), "w") as f:
f.write(jsonpickle.dumps(self, indent=2))
def _save_compose(self):
services = {}
for node_type in self.groups.keys():
for node in self.get_all_nodes(node_type):
services[node.service_name()] = node.compose()
compose = {
"version": "3",
"networks": {
utils.with_doris_prefix(self.name): {
"driver": "bridge",
"ipam": {
"config": [{
"subnet": "{}.0.0/16".format(self.subnet),
}]
}
}
},
"services": services,
}
utils.write_compose_file(self.get_compose_file(), compose)
def get_compose_file(self):
global get_compose_file
return get_compose_file(self.name)

View File

@ -0,0 +1,543 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import cluster as CLUSTER
import database
import utils
import os
import os.path
import prettytable
import shutil
import sys
LOG = utils.get_logger()
# return for_all, related_nodes, related_node_num
def get_ids_related_nodes(cluster, fe_ids, be_ids, ignore_not_exists=False):
if fe_ids is None and be_ids is None:
return True, None, cluster.get_all_nodes_num()
def get_ids_related_nodes_with_type(node_type, ids):
if ids is None:
return []
if not ids:
return cluster.get_all_nodes(node_type)
else:
nodes = []
for id in ids:
try:
nodes.append(cluster.get_node(node_type, id))
except Exception as e:
if ignore_not_exists:
LOG.warning(
utils.render_yellow(
"Not found {} with id {}".format(
node_type, id)))
else:
raise e
return nodes
nodes = get_ids_related_nodes_with_type(
CLUSTER.Node.TYPE_FE, fe_ids) + get_ids_related_nodes_with_type(
CLUSTER.Node.TYPE_BE, be_ids)
related_node_num = len(nodes)
return len(nodes) == cluster.get_all_nodes_num(), nodes, len(nodes)
class Command(object):
def __init__(self, name):
self.name = name
def add_parser(self, args_parsers):
raise Exception("No implemented")
def run(self, args):
raise Exception("No implemented")
def _add_parser_ids_args(self, parser):
group = parser.add_argument_group("for existing nodes",
"apply to the existing nodes.")
group.add_argument("--fe-id", nargs="*", type=int, help="Specify up fe ids, support multiple ids, " \
"if specific --fe-id but not specific ids, apply to all fe.")
group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \
"if specific --be but not specific ids, apply to all be.")
def _get_parser_bool_action(self, is_store_true):
if sys.version_info.major == 3 and sys.version_info.minor >= 9:
return argparse.BooleanOptionalAction
else:
return "store_true" if is_store_true else "store_false"
class SimpleCommand(Command):
def __init__(self, command, help):
super().__init__(command)
self.command = command
self.help = help
def add_parser(self, args_parsers):
help = self.help + " If none of --fe-id, --be-id is specific, then apply to all containers."
parser = args_parsers.add_parser(self.command, help=help)
parser.add_argument("NAME", help="Specify cluster name.")
self._add_parser_ids_args(parser)
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
_, related_nodes, related_node_num = get_ids_related_nodes(
cluster, args.fe_id, args.be_id)
utils.exec_docker_compose_command(cluster.get_compose_file(),
self.command,
nodes=related_nodes)
show_cmd = self.command[0].upper() + self.command[1:]
LOG.info(
utils.render_green("{} succ, total related node num {}".format(
show_cmd, related_node_num)))
class UpCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\
"or add new containers. " \
"If none of --add-fe-num, --add-be-num, --fe-id, --be-id is specific, " \
"then apply to all containers.")
parser.add_argument("NAME", default="", help="Specific cluster name.")
parser.add_argument("IMAGE",
default="",
nargs="?",
help="Specify docker image.")
group1 = parser.add_argument_group("add new nodes",
"add cluster nodes.")
group1.add_argument(
"--add-fe-num",
type=int,
help="Specify add fe num, default 3 for a new cluster.")
group1.add_argument(
"--add-be-num",
type=int,
help="Specify add be num, default 3 for a new cluster.")
self._add_parser_ids_args(parser)
group2 = parser.add_mutually_exclusive_group()
group2.add_argument(
"--no-start",
default=False,
action=self._get_parser_bool_action(True),
help="Not start containers, create or update config image only.")
group2.add_argument("--force-recreate",
default=False,
action=self._get_parser_bool_action(True),
help="Recreate containers even if their configuration" \
"and image haven't changed. ")
def run(self, args):
if not args.NAME:
raise Exception("Need specific not empty cluster name")
for_all = True
try:
cluster = CLUSTER.Cluster.load(args.NAME)
if args.fe_id != None or args.be_id != None or args.add_fe_num or args.add_be_num:
for_all = False
except:
# a new cluster
if not args.IMAGE:
raise Exception("New cluster must specific image")
if args.fe_id != None:
args.fe_id = None
LOG.warning(
utils.render_yellow("Ignore --fe-id for new cluster"))
if args.be_id != None:
args.be_id = None
LOG.warning(
utils.render_yellow("Ignore --be-id for new cluster"))
cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
if not args.add_fe_num:
args.add_fe_num = 3
if not args.add_be_num:
args.add_be_num = 3
_, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id,
args.be_id)
if not related_nodes:
related_nodes = []
if args.add_fe_num:
for i in range(args.add_fe_num):
related_nodes.append(cluster.add(CLUSTER.Node.TYPE_FE))
if args.add_be_num:
for i in range(args.add_be_num):
related_nodes.append(cluster.add(CLUSTER.Node.TYPE_BE))
if args.IMAGE:
for node in related_nodes:
node.set_image(args.IMAGE)
if for_all and args.IMAGE:
cluster.set_image(args.IMAGE)
cluster.save()
options = []
if args.no_start:
options.append("--no-start")
else:
options = ["-d", "--remove-orphans"]
if args.force_recreate:
options.append("--force-recreate")
related_node_num = len(related_nodes)
if for_all:
related_node_num = cluster.get_all_nodes_num()
related_nodes = None
utils.exec_docker_compose_command(cluster.get_compose_file(), "up",
options, related_nodes)
if args.no_start:
LOG.info(
utils.render_green(
"Not up cluster cause specific --no-start, related node num {}"
.format(related_node_num)))
else:
LOG.info(
utils.render_green(
"Up cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
class DownCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser("down",
help="Down doris containers, networks. "\
"It will also remove node from DB. " \
"If none of --fe-id, --be-id is specific, "\
"then apply to all containers.")
parser.add_argument("NAME", help="Specify cluster name")
self._add_parser_ids_args(parser)
parser.add_argument(
"--clean",
default=False,
action=self._get_parser_bool_action(True),
help=
"Clean container related files, include expose data, config and logs"
)
parser.add_argument(
"--drop-force",
default=None,
action=self._get_parser_bool_action(True),
help="Drop doris node force. For be, if specific --drop-force, "\
"it will send dropp to fe, otherwise send decommission to fe.")
def run(self, args):
cluster = CLUSTER.Cluster.load(args.NAME)
for_all, related_nodes, related_node_num = get_ids_related_nodes(
cluster, args.fe_id, args.be_id, ignore_not_exists=True)
if for_all:
utils.exec_docker_compose_command(cluster.get_compose_file(),
"down",
["-v", "--remove-orphans"])
if args.clean:
utils.enable_dir_with_rw_perm(cluster.get_path())
shutil.rmtree(cluster.get_path())
LOG.info(
utils.render_yellow(
"Clean cluster data cause has specific --clean"))
else:
db_mgr = database.get_db_mgr(cluster.name)
for node in related_nodes:
if node.is_fe():
fe_endpoint = "{}:{}".format(node.get_ip(),
CLUSTER.FE_EDITLOG_PORT)
db_mgr.drop_fe(fe_endpoint)
elif node.is_be():
be_endpoint = "{}:{}".format(node.get_ip(),
CLUSTER.BE_HEARTBEAT_PORT)
if args.drop_force:
db_mgr.drop_be(be_endpoint)
else:
db_mgr.decommission_be(be_endpoint)
else:
raise Exception("Unknown node type: {}".format(
node.node_type()))
#utils.exec_docker_compose_command(cluster.get_compose_file(),
# "stop",
# nodes=[node])
utils.exec_docker_compose_command(cluster.get_compose_file(),
"rm", ["-s", "-v", "-f"],
nodes=[node])
if args.clean:
utils.enable_dir_with_rw_perm(node.get_path())
shutil.rmtree(node.get_path())
LOG.info(
utils.render_yellow(
"Clean {} with id {} data cause has specific --clean"
.format(node.node_type(), node.id)))
cluster.remove(node.node_type(), node.id)
cluster.save()
LOG.info(
utils.render_green(
"Down cluster {} succ, related node num {}".format(
args.NAME, related_node_num)))
class ListNode(object):
def __init__(self):
self.node_type = ""
self.id = 0
self.cluster_name = ""
self.ip = ""
self.status = ""
self.container_id = ""
self.image = ""
self.created = ""
self.alive = ""
self.is_master = ""
self.query_port = ""
self.tablet_num = ""
self.last_heartbeat = ""
self.err_msg = ""
def info(self):
return (self.cluster_name, "{}-{}".format(self.node_type, self.id),
self.ip, self.status, self.container_id, self.image,
self.created, self.alive, self.is_master, self.query_port,
self.tablet_num, self.last_heartbeat, self.err_msg)
def update_db_info(self, db_mgr):
if self.node_type == CLUSTER.Node.TYPE_FE:
fe = db_mgr.get_fe(self.id)
if fe:
self.alive = str(fe.alive).lower()
self.is_master = str(fe.is_master).lower()
self.query_port = fe.query_port
self.last_heartbeat = fe.last_heartbeat
self.err_msg = fe.err_msg
elif self.node_type == CLUSTER.Node.TYPE_BE:
be = db_mgr.get_be(self.id)
if be:
self.alive = str(be.alive).lower()
self.tablet_num = be.tablet_num
self.last_heartbeat = be.last_heartbeat
self.err_msg = be.err_msg
class ListCommand(Command):
def add_parser(self, args_parsers):
parser = args_parsers.add_parser(
"ls", help="List running doris compose clusters.")
parser.add_argument(
"NAME",
nargs="*",
help=
"Specify multiple clusters, if specific, show all their containers."
)
parser.add_argument(
"-a",
"--all",
default=False,
action=self._get_parser_bool_action(True),
help="Show all stopped and bad doris compose projects")
def run(self, args):
COMPOSE_MISSING = "(missing)"
COMPOSE_BAD = "(bad)"
COMPOSE_GOOD = ""
SERVICE_DEAD = "dead"
class ComposeService(object):
def __init__(self, name, ip, image):
self.name = name
self.ip = ip
self.image = image
def parse_cluster_compose_file(cluster_name):
compose_file = CLUSTER.get_compose_file(cluster_name)
if not os.path.exists(compose_file):
return COMPOSE_MISSING, {}
try:
compose = utils.read_compose_file(compose_file)
if not compose:
return COMPOSE_BAD, {}
services = compose.get("services", {})
if services is None:
return COMPOSE_BAD, {}
return COMPOSE_GOOD, {
service:
ComposeService(
service,
list(service_conf["networks"].values())[0]
["ipv4_address"], service_conf["image"])
for service, service_conf in services.items()
}
except:
return COMPOSE_BAD, {}
clusters = {}
search_names = []
if args.NAME:
search_names = args.NAME
elif os.path.exists(CLUSTER.LOCAL_DORIS_PATH):
search_names = os.listdir(CLUSTER.LOCAL_DORIS_PATH)
for cluster_name in search_names:
status, services = parse_cluster_compose_file(cluster_name)
clusters[cluster_name] = {"status": status, "services": services}
docker_clusters = utils.get_doris_containers(args.NAME)
for cluster_name, containers in docker_clusters.items():
cluster_info = clusters.get(cluster_name, None)
if not cluster_info:
cluster_info = {"status": COMPOSE_MISSING, "services": {}}
clusters[cluster_name] = cluster_info
for container in containers:
#if container.status == "running" and cluster_info[
# "status"] == COMPOSE_GOOD and (
# container.name not in cluster_info["services"]):
# container.status = "orphans"
cluster_info["services"][container.name] = container
TYPE_COMPOSESERVICE = type(ComposeService("", "", ""))
if not args.NAME:
headers = (utils.render_green(field)
for field in ("CLUSTER", "STATUS", "CONFIG FILES"))
table = prettytable.PrettyTable(headers)
for name in sorted(clusters.keys()):
cluster_info = clusters[name]
service_statuses = {}
for _, container in cluster_info["services"].items():
status = SERVICE_DEAD if type(
container) == TYPE_COMPOSESERVICE else container.status
service_statuses[status] = service_statuses.get(status,
0) + 1
show_status = ",".join([
"{}({})".format(status, count)
for status, count in service_statuses.items()
])
if not args.all and service_statuses.get("running", 0) == 0:
continue
compose_file = CLUSTER.get_compose_file(name)
table.add_row(
(name, show_status, "{}{}".format(compose_file,
cluster_info["status"])))
print(table)
return
headers = (utils.render_green(field)
for field in ("CLUSTER", "NAME", "IP", "STATUS",
"CONTAINER ID", "IMAGE", "CREATED", "alive",
"is_master", "query_port", "tablet_num",
"last_heartbeat", "err_msg"))
table = prettytable.PrettyTable(headers)
for cluster_name in sorted(clusters.keys()):
fe_ids = {}
be_ids = {}
services = clusters[cluster_name]["services"]
db_mgr = database.get_db_mgr(cluster_name, False)
nodes = []
for service_name, container in services.items():
_, node_type, id = utils.parse_service_name(container.name)
node = ListNode()
node.cluster_name = cluster_name
node.node_type = node_type
node.id = id
node.update_db_info(db_mgr)
nodes.append(node)
if node_type == CLUSTER.Node.TYPE_FE:
fe_ids[id] = True
elif node_type == CLUSTER.Node.TYPE_BE:
be_ids[id] = True
if type(container) == TYPE_COMPOSESERVICE:
node.ip = container.ip
node.image = container.image
node.status = SERVICE_DEAD
else:
node.created = container.attrs.get("Created",
"")[:19].replace(
"T", " ")
node.ip = list(
container.attrs["NetworkSettings"]
["Networks"].values())[0]["IPAMConfig"]["IPv4Address"]
node.image = ",".join(container.image.tags)
node.container_id = container.short_id
node.status = container.status
for id, fe in db_mgr.fe_states.items():
if fe_ids.get(id, False):
continue
node = ListNode()
node.cluster_name = cluster_name
node.node_type = CLUSTER.Node.TYPE_FE
node.id = id
node.status = SERVICE_DEAD
node.update_db_info(db_mgr)
nodes.append(node)
for id, be in db_mgr.be_states.items():
if be_ids.get(id, False):
continue
node = ListNode()
node.cluster_name = cluster_name
node.node_type = CLUSTER.Node.TYPE_BE
node.id = id
node.status = SERVICE_DEAD
node.update_db_info(db_mgr)
nodes.append(node)
def get_key(node):
key = node.id
if node.node_type == CLUSTER.Node.TYPE_FE:
key += 0 * CLUSTER.ID_LIMIT
elif node.node_type == CLUSTER.Node.TYPE_BE:
key += 1 * CLUSTER.ID_LIMIT
else:
key += 2 * CLUSTER.ID_LIMIT
return key
for node in sorted(nodes, key=get_key):
table.add_row(node.info())
print(table)
ALL_COMMANDS = [
UpCommand("up"),
DownCommand("down"),
SimpleCommand("start", "Start the doris containers. "),
SimpleCommand("stop", "Stop the doris containers. "),
SimpleCommand("restart", "Restart the doris containers. "),
SimpleCommand("pause", "Pause the doris containers. "),
SimpleCommand("unpause", "Unpause the doris containers. "),
ListCommand("ls"),
]

View File

@ -0,0 +1,239 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import cluster as CLUSTER
import os.path
import pymysql
import time
import utils
LOG = utils.get_logger()
class FEState(object):
def __init__(self, id, query_port, is_master, alive, last_heartbeat,
err_msg):
self.id = id
self.query_port = query_port
self.is_master = is_master
self.alive = alive
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
class BEState(object):
def __init__(self, id, decommissioned, alive, tablet_num, last_heartbeat,
err_msg):
self.id = id
self.decommissioned = decommissioned
self.alive = alive
self.tablet_num = tablet_num
self.last_heartbeat = last_heartbeat
self.err_msg = err_msg
class DBManager(object):
def __init__(self):
self.fe_states = {}
self.be_states = {}
self.query_port = -1
self.conn = None
def set_query_port(self, query_port):
self.query_port = query_port
def get_fe(self, id):
return self.fe_states.get(id, None)
def get_be(self, id):
return self.be_states.get(id, None)
def load_states(self, query_ports):
self._load_fe_states(query_ports)
self._load_be_states()
def drop_fe(self, fe_endpoint):
id = CLUSTER.Node.get_id_from_ip(fe_endpoint[:fe_endpoint.find(":")])
try:
self._exec_query(
"ALTER SYSTEM DROP FOLLOWER '{}'".format(fe_endpoint))
LOG.info("Drop fe {} with id {} from db succ.".format(
fe_endpoint, id))
except Exception as e:
if str(e).find("frontend does not exist") >= 0:
LOG.info(
"Drop fe {} with id {} from db succ cause it does not exist in db."
.format(fe_endpoint, id))
return
raise e
def drop_be(self, be_endpoint):
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
try:
self._exec_query(
"ALTER SYSTEM DROPP BACKEND '{}'".format(be_endpoint))
LOG.info("Drop be {} with id {} from db succ.".format(
be_endpoint, id))
except Exception as e:
if str(e).find("backend does not exists") >= 0:
LOG.info(
"Drop be {} with id {} from db succ cause it does not exist in db."
.format(be_endpoint, id))
return
raise e
def decommission_be(self, be_endpoint):
old_tablet_num = 0
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
if id not in self.be_states:
self._load_be_states()
if id in self.be_states:
be = self.be_states[id]
old_tablet_num = be.tablet_num
if not be.alive:
raise Exception("Decommission be {} with id {} fail " \
"cause it's not alive, maybe you should specific --drop-force " \
" to dropp it from db".format(be_endpoint, id))
try:
self._exec_query(
"ALTER SYSTEM DECOMMISSION BACKEND '{}'".format(be_endpoint))
LOG.info("Mark be {} with id {} as decommissioned, start migrate its tablets, " \
"wait migrating job finish.".format(be_endpoint, id))
except Exception as e:
if str(e).find("Backend does not exist") >= 0:
LOG.info("Decommission be {} with id {} from db succ " \
"cause it does not exist in db.",format(be_endpoint, id))
return
raise e
while True:
self._load_be_states()
be = self.be_states.get(id, None)
if not be:
LOG.info("Decommission be {} succ, total migrate {} tablets, " \
"has drop it from db.".format(be_endpoint, old_tablet_num))
return
LOG.info(
"Decommission be {} status: alive {}, decommissioned {}. " \
"It is migrating its tablets, left {}/{} tablets."
.format(be_endpoint, be.alive, be.decommissioned, be.tablet_num, old_tablet_num))
time.sleep(5)
def _load_fe_states(self, query_ports):
fe_states = {}
alive_master_fe_port = None
for record in self._exec_query("show frontends;"):
ip = record[1]
is_master = record[7] == "true"
alive = record[10] == "true"
last_heartbeat = record[12]
err_msg = record[14]
id = CLUSTER.Node.get_id_from_ip(ip)
query_port = query_ports.get(id, None)
fe = FEState(id, query_port, is_master, alive, last_heartbeat,
err_msg)
fe_states[id] = fe
if is_master and alive and query_port:
alive_master_fe_port = query_port
self.fe_states = fe_states
if alive_master_fe_port and alive_master_fe_port != self.query_port:
self.query_port = alive_master_fe_port
self._reset_conn()
def _load_be_states(self):
be_states = {}
for record in self._exec_query("show backends;"):
ip = record[1]
last_heartbeat = record[7]
alive = record[8] == "true"
decommissioned = record[9] == "true"
tablet_num = int(record[10])
err_msg = record[18]
id = CLUSTER.Node.get_id_from_ip(ip)
be = BEState(id, decommissioned, alive, tablet_num, last_heartbeat,
err_msg)
be_states[id] = be
self.be_states = be_states
def _exec_query(self, sql):
self._prepare_conn()
with self.conn.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()
def _prepare_conn(self):
if self.conn:
return
if self.query_port <= 0:
raise Exception("Not set query_port")
self._reset_conn()
def _reset_conn(self):
self.conn = pymysql.connect(user="root",
host="127.0.0.1",
read_timeout = 10,
port=self.query_port)
def get_db_mgr(cluster_name, required_load_succ=True):
assert cluster_name
db_mgr = DBManager()
containers = utils.get_doris_containers(cluster_name).get(
cluster_name, None)
if not containers:
return db_mgr
alive_fe_ports = {}
for container in containers:
if utils.is_container_running(container):
_, node_type, id = utils.parse_service_name(container.name)
if node_type == CLUSTER.Node.TYPE_FE:
query_port = utils.get_map_ports(container).get(
CLUSTER.FE_QUERY_PORT, None)
if query_port:
alive_fe_ports[id] = query_port
if not alive_fe_ports:
return db_mgr
master_fe_ip_file = os.path.join(CLUSTER.get_status_path(cluster_name),
"master_fe_ip")
query_port = None
if os.path.exists(master_fe_ip_file):
with open(master_fe_ip_file, "r") as f:
master_fe_ip = f.read()
if master_fe_ip:
master_id = CLUSTER.Node.get_id_from_ip(master_fe_ip)
query_port = alive_fe_ports.get(master_id, None)
if not query_port:
# A new cluster's master is fe-1
if 1 in alive_fe_ports:
query_port = alive_fe_ports[1]
else:
query_port = list(alive_fe_ports.values())[0]
db_mgr.set_query_port(query_port)
try:
db_mgr.load_states(alive_fe_ports)
except Exception as e:
if required_load_succ:
raise e
LOG.exception(e)
return db_mgr

View File

@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import command
import utils
def parse_args():
ap = argparse.ArgumentParser(description="")
args_parsers = ap.add_subparsers(dest="command")
for cmd in command.ALL_COMMANDS:
cmd.add_parser(args_parsers)
return ap.parse_args(), ap.format_help()
def run(args, help):
timer = utils.Timer()
for cmd in command.ALL_COMMANDS:
if args.command == cmd.name:
return cmd.run(args)
timer.cancel()
print(help)
return -1
def main():
args, help = parse_args()
run(args, help)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
docker
docker-compose
jsonpickle
prettytable
pymysql

View File

@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
export MASTER_FE_IP=""
export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip
health_log() {
date >> "$DORIS_HOME/log/health.out"
echo "$@" >> "$DORIS_HOME/log/health.out"
}
read_master_fe_ip() {
MASTER_FE_IP=`cat $MASTER_FE_IP_FILE`
if [ $? -eq 0 ]; then
health_log "master fe ${MASTER_FE_IP} has ready."
return 0
else
health_log "master fe has not ready."
return 1
fi
}

View File

@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
DIR=$(cd $(dirname $0);pwd)
source $DIR/common.sh
REGISTER_FILE=$DORIS_HOME/status/$MY_IP-register
add_backend() {
while true; do
read_master_fe_ip
if [ $? -ne 0 ]; then
sleep 1
continue
fi
output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1`
res=$?
health_log "$output"
[ $res -eq 0 ] && break
(echo $output | grep "Same backend already exists") && break
sleep 1
done
touch $REGISTER_FILE
}
main() {
if [ ! -f $REGISTER_FILE ]; then
add_backend
fi
bash $DORIS_HOME/bin/start_be.sh
}
main

View File

@ -0,0 +1,70 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
DIR=$(cd $(dirname $0);pwd)
source $DIR/common.sh
add_frontend() {
while true; do
read_master_fe_ip
if [ $? -ne 0 ]; then
sleep 1
continue
fi
output=`mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1`
res=$?
health_log "$output"
[ $res -eq 0 ] && break
(echo $output | grep "frontend already exists") && break
sleep 1
done
}
fe_daemon() {
set +e
while true; do
output=`mysql -P $FE_QUERY_PORT -h $MY_IP -u root --execute "SHOW FRONTENDS;" | grep -w $MY_IP | awk '{print $8}' 2>&1`
if [ $? -ne 0 ]; then
health_log "$output"
else
echo $output | grep true
if [ $? -eq 0 ]; then
echo $MY_IP > $MASTER_FE_IP_FILE
if [ "$MASTER_FE_IP" != "$MY_IP" ]; then
health_log "change to master, last master is $MASTER_FE_IP"
MASTER_FE_IP=$MY_IP
fi
fi
fi
sleep 3
done
}
main() {
if [ "$MY_ID" = "1" -o -d "${DORIS_HOME}/doris-meta/image" ]; then
fe_daemon &
bash $DORIS_HOME/bin/start_fe.sh
else
add_frontend
fe_daemon &
$DORIS_HOME/bin/start_fe.sh --helper $MASTER_FE_IP:$FE_EDITLOG_PORT
fi
}
main

View File

@ -0,0 +1,253 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import docker
import logging
import os
import subprocess
import time
import yaml
DORIS_PREFIX = "doris-"
class Timer(object):
def __init__(self):
self.start = time.time()
self.canceled = False
def __del__(self):
if not self.canceled:
LOG.info("=== Total run time: {} s".format(
int(time.time() - self.start)))
def cancel(self):
self.canceled = True
def get_logger(name=None):
logger = logging.getLogger(name)
if not logger.hasHandlers():
formatter = logging.Formatter(
'%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s - %(message)s'
)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(logging.INFO)
return logger
LOG = get_logger()
def render_red(s):
return "\x1B[31m" + str(s) + "\x1B[0m"
def render_green(s):
return "\x1B[32m" + str(s) + "\x1B[0m"
def render_yellow(s):
return "\x1B[33m" + str(s) + "\x1B[0m"
def render_blue(s):
return "\x1B[34m" + str(s) + "\x1B[0m"
def with_doris_prefix(name):
return DORIS_PREFIX + name
def parse_service_name(service_name):
import cluster
if not service_name or not service_name.startswith(DORIS_PREFIX):
return None, None, None
pos2 = service_name.rfind("-")
if pos2 < 0:
return None, None, None
id = None
try:
id = int(service_name[pos2 + 1:])
except:
return None, None, None
pos1 = service_name.rfind("-", len(DORIS_PREFIX), pos2 - 1)
if pos1 < 0:
return None, None, None
node_type = service_name[pos1 + 1:pos2]
if node_type not in cluster.Node.TYPE_ALL:
return None, None, None
return service_name[len(DORIS_PREFIX):pos1], node_type, id
def get_map_ports(container):
return {
int(innner.replace("/tcp", "")): int(outer[0]["HostPort"])
for innner, outer in container.attrs.get("NetworkSettings", {}).get(
"Ports", {}).items()
}
def is_container_running(container):
return container.status == "running"
# return all doris containers when cluster_names is empty
def get_doris_containers(cluster_names):
if cluster_names:
if type(cluster_names) == type(""):
filter_names = "{}{}-*".format(DORIS_PREFIX, cluster_names)
else:
filter_names = "|".join([
"{}{}-*".format(DORIS_PREFIX, name) for name in cluster_names
])
else:
filter_names = "{}*".format(DORIS_PREFIX)
clusters = {}
client = docker.client.from_env()
containers = client.containers.list(filters={"name": filter_names})
for container in containers:
cluster_name, _, _ = parse_service_name(container.name)
if not cluster_name:
continue
if cluster_names and cluster_name not in cluster_names:
continue
if cluster_name not in clusters:
clusters[cluster_name] = []
clusters[cluster_name].append(container)
return clusters
def get_doris_running_containers(cluster_name):
return {
container.name: container
for container in get_doris_containers(cluster_name).get(
cluster_name, []) if is_container_running(container)
}
def is_dir_empty(dir):
return False if os.listdir(dir) else True
def exec_shell_command(command, ignore_errors=False):
LOG.info("Exec command: {}".format(command))
p = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out = p.communicate()[0].decode('utf-8')
if not ignore_errors:
assert p.returncode == 0, out
if out:
print(out)
return p.returncode, out
def exec_docker_compose_command(compose_file,
command,
options=None,
nodes=None,
user_command=None):
if nodes != None and not nodes:
return 0, "Skip"
compose_cmd = "docker-compose -f {} {} {} {} {}".format(
compose_file, command, " ".join(options) if options else "",
" ".join([node.service_name() for node in nodes]) if nodes else "",
user_command if user_command else "")
return exec_shell_command(compose_cmd)
def get_docker_subnets_prefix16():
subnet_prefixes = {}
client = docker.from_env()
for network in client.networks.list():
if not network.attrs:
continue
ipam = network.attrs.get("IPAM", None)
if not ipam:
continue
configs = ipam.get("Config", None)
if not configs:
continue
for config in configs:
subnet = config.get("Subnet", None)
if not subnet:
continue
pos1 = subnet.find(".")
if pos1 <= 0:
continue
pos2 = subnet.find(".", pos1 + 1)
if pos2 <= 0:
continue
num1 = subnet[0:pos1]
num2 = subnet[pos1 + 1:pos2]
network_part_len = 16
pos = subnet.find("/")
if pos != -1:
network_part_len = int(subnet[pos + 1:])
if network_part_len < 16:
for i in range(256):
subnet_prefixes["{}.{}".format(num1, i)] = True
else:
subnet_prefixes["{}.{}".format(num1, num2)] = True
LOG.debug("Get docker subnet prefixes: {}".format(subnet_prefixes))
return subnet_prefixes
def copy_image_directory(image, image_dir, local_dir):
client = docker.from_env()
volumes = ["{}:/opt/mount".format(local_dir)]
if image_dir.endswith("/"):
image_dir += "."
elif not image_dir.endswith("."):
image_dir += "/."
client.containers.run(
image,
remove=True,
volumes=volumes,
entrypoint="cp -r {} /opt/mount/".format(image_dir))
def enable_dir_with_rw_perm(dir):
if not os.path.exists(dir):
return
client = docker.client.from_env()
client.containers.run("ubuntu",
remove=True,
volumes=["{}:/opt/mount".format(dir)],
entrypoint="chmod a+rw -R {}".format("/opt/mount"))
def read_compose_file(file):
with open(file, "r") as f:
return yaml.safe_load(f.read())
def write_compose_file(file, compose):
with open(file, "w") as f:
f.write(yaml.dump(compose))