Files
doris/docker/runtime/doris-compose/utils.py

295 lines
7.9 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import docker
import json
import logging
import os
import pwd
import subprocess
import time
import yaml
DORIS_PREFIX = "doris-"
LOG = None
ENABLE_LOG = True
class Timer(object):
def __init__(self):
self.start = time.time()
self.canceled = False
def show(self):
if not self.canceled:
LOG.info("=== Total run time: {} s".format(
int(time.time() - self.start)))
def cancel(self):
self.canceled = True
def set_enable_log(enabled):
global ENABLE_LOG
ENABLE_LOG = enabled
get_logger().disabled = not enabled
def is_enable_log():
return ENABLE_LOG
def get_logger(name=None):
global LOG
if LOG != None:
return LOG
logger = logging.getLogger(name)
if not logger.hasHandlers():
formatter = logging.Formatter(
'%(asctime)s - %(filename)s - %(lineno)dL - %(levelname)s - %(message)s'
)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(logging.INFO)
LOG = logger
return logger
get_logger()
def render_red(s):
return "\x1B[31m" + str(s) + "\x1B[0m"
def render_green(s):
return "\x1B[32m" + str(s) + "\x1B[0m"
def render_yellow(s):
return "\x1B[33m" + str(s) + "\x1B[0m"
def render_blue(s):
return "\x1B[34m" + str(s) + "\x1B[0m"
def with_doris_prefix(name):
return DORIS_PREFIX + name
def parse_service_name(service_name):
import cluster
if not service_name or not service_name.startswith(DORIS_PREFIX):
return None, None, None
pos2 = service_name.rfind("-")
if pos2 < 0:
return None, None, None
id = None
try:
id = int(service_name[pos2 + 1:])
except:
return None, None, None
pos1 = service_name.rfind("-", len(DORIS_PREFIX), pos2 - 1)
if pos1 < 0:
return None, None, None
node_type = service_name[pos1 + 1:pos2]
if node_type not in cluster.Node.TYPE_ALL:
return None, None, None
return service_name[len(DORIS_PREFIX):pos1], node_type, id
def get_map_ports(container):
return {
int(innner.replace("/tcp", "")): int(outer[0]["HostPort"])
for innner, outer in container.attrs.get("NetworkSettings", {}).get(
"Ports", {}).items()
}
def is_container_running(container):
return container.status == "running"
# return all doris containers when cluster_names is empty
def get_doris_containers(cluster_names):
if cluster_names:
if type(cluster_names) == type(""):
filter_names = "{}{}-*".format(DORIS_PREFIX, cluster_names)
else:
filter_names = "|".join([
"{}{}-*".format(DORIS_PREFIX, name) for name in cluster_names
])
else:
filter_names = "{}*".format(DORIS_PREFIX)
clusters = {}
client = docker.client.from_env()
containers = client.containers.list(filters={"name": filter_names})
for container in containers:
cluster_name, _, _ = parse_service_name(container.name)
if not cluster_name:
continue
if cluster_names and cluster_name not in cluster_names:
continue
if cluster_name not in clusters:
clusters[cluster_name] = []
clusters[cluster_name].append(container)
return clusters
def get_doris_running_containers(cluster_name):
return {
container.name: container
for container in get_doris_containers(cluster_name).get(
cluster_name, []) if is_container_running(container)
}
def is_dir_empty(dir):
return False if os.listdir(dir) else True
def exec_shell_command(command, ignore_errors=False):
LOG.info("Exec command: {}".format(command))
p = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out = p.communicate()[0].decode('utf-8')
if not ignore_errors:
assert p.returncode == 0, out
if ENABLE_LOG and out:
print(out)
return p.returncode, out
def exec_docker_compose_command(compose_file,
command,
options=None,
nodes=None,
user_command=None):
if nodes != None and not nodes:
return 0, "Skip"
compose_cmd = "docker-compose -f {} {} {} {} {}".format(
compose_file, command, " ".join(options) if options else "",
" ".join([node.service_name() for node in nodes]) if nodes else "",
user_command if user_command else "")
return exec_shell_command(compose_cmd)
def get_docker_subnets_prefix16():
subnet_prefixes = {}
client = docker.from_env()
for network in client.networks.list():
if not network.attrs:
continue
ipam = network.attrs.get("IPAM", None)
if not ipam:
continue
configs = ipam.get("Config", None)
if not configs:
continue
for config in configs:
subnet = config.get("Subnet", None)
if not subnet:
continue
pos1 = subnet.find(".")
if pos1 <= 0:
continue
pos2 = subnet.find(".", pos1 + 1)
if pos2 <= 0:
continue
num1 = subnet[0:pos1]
num2 = subnet[pos1 + 1:pos2]
network_part_len = 16
pos = subnet.find("/")
if pos != -1:
network_part_len = int(subnet[pos + 1:])
if network_part_len < 16:
for i in range(256):
subnet_prefixes["{}.{}".format(num1, i)] = True
else:
subnet_prefixes["{}.{}".format(num1, num2)] = True
LOG.debug("Get docker subnet prefixes: {}".format(subnet_prefixes))
return subnet_prefixes
def copy_image_directory(image, image_dir, local_dir):
client = docker.from_env()
volumes = ["{}:/opt/mount".format(local_dir)]
if image_dir.endswith("/"):
image_dir += "."
elif not image_dir.endswith("."):
image_dir += "/."
client.containers.run(
image,
remove=True,
volumes=volumes,
entrypoint="cp -r {} /opt/mount/".format(image_dir))
def enable_dir_with_rw_perm(dir):
if not os.path.exists(dir):
return
client = docker.client.from_env()
client.containers.run("ubuntu",
remove=True,
volumes=["{}:/opt/mount".format(dir)],
entrypoint="chmod a+rw -R {}".format("/opt/mount"))
def get_path_owner(path):
try:
return pwd.getpwuid(os.stat(path).st_uid).pw_name
except:
return ""
def read_compose_file(file):
with open(file, "r") as f:
return yaml.safe_load(f.read())
def write_compose_file(file, compose):
with open(file, "w") as f:
f.write(yaml.dump(compose))
def pretty_json(json_data):
return json.dumps(json_data, indent=4, sort_keys=True)
def is_true(val):
return str(val) == "true" or str(val) == "1"
def escape_null(val):
return "" if val == "\\N" else val