支持流式容灾特性

This commit is contained in:
chenzhaoliang1228
2022-08-10 11:16:36 +08:00
committed by Your Name
parent c41f4de4ea
commit aa970803c3
34 changed files with 5765 additions and 939 deletions

View File

@ -17,6 +17,7 @@
# ----------------------------------------------------------------------------
# Description : Common is a utility with a lot of common functions
#############################################################################
import ctypes
import sys
import subprocess
import os
@ -28,6 +29,7 @@ import time
import multiprocessing
import _thread as thread
import pwd
import json
import base64
import secrets
import string
@ -35,6 +37,7 @@ import stat
import csv
import copy
from subprocess import PIPE
from subprocess import Popen
# The installation starts, but the package is not decompressed completely.
# The lib64/libz.so.1 file is incomplete, and the hashlib depends on the
@ -106,6 +109,7 @@ from base_utils.os.cmd_util import CmdUtil
from base_utils.os.env_util import EnvUtil
from base_utils.os.file_util import FileUtil
from domain_utils.cluster_file.version_info import VersionInfo
from domain_utils.cluster_file.cluster_dir import ClusterDir
from domain_utils.security.random_value import RandomValue
from base_utils.os.process_util import ProcessUtil
from domain_utils.sql_handler.sql_executor import SqlExecutor
@ -199,6 +203,7 @@ class DefaultValue():
FILE_MODE = 640
FILE_MODE_PERMISSION = 0o640
KEY_FILE_MODE = 600
KEY_FILE_MODE_IN_OS = 0o600
MIN_FILE_MODE = 400
SPE_FILE_MODE = 500
KEY_DIRECTORY_MODE = 700
@ -318,6 +323,9 @@ class DefaultValue():
# FI_ELK_KRB_XML is used in elk
FI_ELK_KRB_XML = "auth_config/elk-krb-site.xml"
FI_KRB_CONF = "krb5.conf"
# cluster status
CLUSTER_STATUS_NORMAL = "Normal"
CLUSTER_STATUS_DEGRADED = "Degraded"
###########################
# instance role
###########################
@ -615,6 +623,60 @@ class DefaultValue():
return NetWorkConfFile
@staticmethod
def get_remote_ips(host, mpp_file):
"""
Get ips from remote host
"""
cmd = "source %s && pssh -s -t 30 -H %s \"hostname -I\"" % (mpp_file, host)
status, output = subprocess.getstatusoutput(cmd)
if status == 0 and output != "":
ips = output.strip().split()
return ips
else:
raise Exception(ErrorCode.GAUSS_516['GAUSS_51632']
% "check remote ips for node:%s, Error:%s." % (host, output))
@staticmethod
def obtain_file_content(dest_file, deduplicate=True, is_list=True):
"""
function:obtains the content of each line in the file.
input: file dir
:return: file context lines list
"""
result = [] if is_list else None
if not os.path.isfile(dest_file):
return result
with open(dest_file, "r") as fp_read:
if is_list:
for line in fp_read:
result.append(line.strip('\n'))
else:
result = fp_read.read().strip()
if deduplicate and is_list:
result = list(set(result))
return result
@staticmethod
def get_all_dn_num_for_dr(file_path, dn_inst, cluster_info, logger):
"""get_all_dn_num_for_dr_cluster"""
# DN inst supports a maximum of replicaNum=8 in postgresql.conf.
default_num = 8
content = DefaultValue.obtain_file_content(file_path, is_list=False)
if content:
default_num = 0
shards = json.loads(content)['remoteClusterConf']["shards"]
logger.debug("Stream cluster json shards:%s" % shards)
if cluster_info.isSingleInstCluster():
for shard in shards:
default_num += len(shard)
else:
default_num += len(shards[0])
peer_insts = cluster_info.getPeerInstance(dn_inst)
default_num += len(peer_insts)
logger.debug("Get config replconninfo dn num:%s" % default_num)
return default_num
@staticmethod
def getIpByHostName():
'''
@ -1616,6 +1678,45 @@ class DefaultValue():
noPassIPs.append(ip)
g_lock.release()
@staticmethod
def fast_ping(node_ip):
"""
ping node with short timeout
"""
cmd = "ping %s -c 1 -w 4" % node_ip
proc = FastPopen(cmd, stdout=PIPE, stderr=PIPE, preexec_fn=os.setsid, close_fds=True)
proc.communicate()
status = proc.returncode
result = (node_ip, True) if status == 0 else (node_ip, False)
return result
@staticmethod
def fast_ping_on_node(on_node, from_ip, to_ip, logger):
"""
Ping on remote node with -I
"""
cmd = "ping %s -c 1 -w 4" % on_node
proc = FastPopen(cmd, stdout=PIPE, stderr=PIPE,
preexec_fn=os.setsid, close_fds=True)
proc.communicate()
status = proc.returncode
if status != 0:
logger.debug("Node:%s ping failed, can not execute remote check." % on_node)
return on_node, False
if on_node == NetUtil.GetHostIpOrName():
cmd_remote = "ping %s -I %s -c 1 -w 4" % (to_ip, from_ip)
else:
cmd_remote = "source %s && pssh -s -H %s 'ping %s -I %s -c 1 -w 4'" \
% (EnvUtil.getMpprcFile(), on_node, to_ip, from_ip)
proc = FastPopen(cmd_remote, stdout=PIPE, stderr=PIPE,
preexec_fn=os.setsid, close_fds=True)
proc.communicate()
status = proc.returncode
result = (to_ip, True) if status == 0 else (to_ip, False)
logger.debug("Remote ping result on node:%s, from ip:%s, to ip:%s, result:%s."
% (on_node, from_ip, to_ip, result))
return result
@staticmethod
def checkIsPing(ips):
"""
@ -2259,7 +2360,7 @@ class DefaultValue():
"Command:%s. Error:\n%s" % (cmd, output))
targetString = output.split("Datanode")[1]
dnPrimary = [x for x in re.split(r"[|\n]", targetString)
if flagStr in x]
if flagStr in x or "Main" in x]
primaryList = []
for dn in dnPrimary:
primaryList.append(list(filter(None, dn.split(" ")))[1])
@ -2866,6 +2967,283 @@ class DefaultValue():
"on node [{0}] successfully.".format(node.name))
logger.log("Remove dynamic_config_file and CM metadata directory on all nodes.")
@staticmethod
def distribute_file_to_node(params):
"""
Distribute file to dest node with path
"""
dest_ip, from_path, to_path, timeout = params
pscp_cmd = "source %s ; pscp -t %s -H %s %s %s" % (
EnvUtil.getMpprcFile(), timeout, dest_ip, from_path, to_path)
status, output = CmdUtil.getstatusoutput_by_fast_popen(pscp_cmd)
return status, output, dest_ip
@staticmethod
def check_is_cm_cluster(logger):
"""
Check cm_ctl is exist.
"""
cmd = "source %s; cm_ctl view | grep cmDataPath" % EnvUtil.getMpprcFile()
status, output = CmdUtil.retryGetstatusoutput(cmd)
if status != 0:
logger.debug("Check cm_ctl is failed msg: %s." % output)
return False
logger.debug("Successfully check cm_ctl is available.")
return True
@staticmethod
def is_disaster_cluster(clusterinfo):
"""
function: determine cluster status normal or disaster
input: NA
output: NA
"""
cmd = "source %s; cm_ctl view | grep cmDataPath | awk -F [:] '{print $2}' | head -n 1" % \
EnvUtil.getMpprcFile()
proc = FastPopen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise Exception(ErrorCode.GAUSS_514['GAUSS_51400'] % cmd + "Error:\n%s" % stderr)
cm_agent_conf_file = stdout.strip() + "/cm_agent/cm_agent.conf"
if not os.path.isfile(cm_agent_conf_file):
host_list = clusterinfo.getClusterNodeNames()
cm_agent_conf_temp_file = os.path.join(EnvUtil.getTmpDirFromEnv(), "cm_agent_tmp.conf")
for host_ip in host_list:
get_file_cmd = g_file.SHELL_CMD_DICT["scpFileFromRemote"] % \
(host_ip, NetUtil.GetHostIpOrName(), cm_agent_conf_file, cm_agent_conf_temp_file)
proc = FastPopen(get_file_cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate()
if not os.path.isfile(cm_agent_conf_temp_file):
continue
else:
break
if os.path.isfile(cm_agent_conf_temp_file):
with open(cm_agent_conf_temp_file, "r") as cma_conf_file:
content = cma_conf_file.read()
ret = re.findall(r'agent_backup_open *= *1|agent_backup_open *= *2', content)
g_file.removeFile(cm_agent_conf_temp_file)
if ret:
return True
else:
return False
else:
raise Exception(ErrorCode.GAUSS_502['GAUSS_50201'] % cm_agent_conf_file)
with open(cm_agent_conf_file, "r") as cma_conf_file:
content = cma_conf_file.read()
ret = re.findall(r'agent_backup_open *= *1|agent_backup_open *= *2', content)
if ret:
return True
else:
return False
@staticmethod
def cm_exist_and_is_disaster_cluster(clusterinfo, logger):
"""
check current cluster cm exist and is disaster cluster.
"""
cm_exist = DefaultValue.check_is_cm_cluster(logger)
if not cm_exist:
return False
is_disaster = DefaultValue.is_disaster_cluster(clusterinfo)
if not is_disaster:
return False
return True
@staticmethod
def write_content_on_file(dest_file, content, authority=None):
"""
Write content on file
"""
authority = authority if authority else DefaultValue.KEY_FILE_MODE_IN_OS
with os.fdopen(os.open(dest_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
authority), "w") as fp_write:
fp_write.write(str(content))
@staticmethod
def get_data_ip_info(instance, logger):
"""
Obtain data ip from file or cluster instance.
"""
cluster_conf_record = os.path.join(EnvUtil.getEnv("PGHOST"),
"streaming_cabin/cluster_conf_record")
if not os.path.isfile(cluster_conf_record):
raise Exception(ErrorCode.GAUSS_502["GAUSS_50201"] % cluster_conf_record)
with open(cluster_conf_record, 'r') as read_fp:
conf_dict = json.load(read_fp)
if not conf_dict or len(conf_dict) != 2:
logger.debug("Failed obtain data ip list.")
raise Exception(ErrorCode.GAUSS_516["GAUSS_51632"] % "check data ip file")
inst_data_ip = ""
local_shards_list = conf_dict["localClusterConf"]["shards"]
for shard_list in local_shards_list:
for shard in shard_list:
if shard["ip"] not in instance.listenIps:
continue
inst_data_ip = shard["dataIp"]
logger.debug("File record:%s, \nGot data ip:%s for instanceId:%s." %
(conf_dict, inst_data_ip, instance.instanceId))
if not inst_data_ip:
raise Exception(ErrorCode.GAUSS_516["GAUSS_51632"] % "obtain local data ip")
return inst_data_ip
@staticmethod
def obtain_hadr_user_encrypt_str(cluster_info, db_user, logger, mode, ignore_res=False):
"""
Obtain hadr user encrypted string
"""
sql = "select value from gs_global_config where name='hadr_user_info';"
instances = []
for node in cluster_info.dbNodes:
if cluster_info.isSingleInstCluster():
for inst in node.datanodes:
instances.append(inst)
for inst in instances:
logger.debug("Obtain hadr user info string on node:%s with port:%s."
% (inst.hostname, inst.port))
status, output = ClusterCommand.remoteSQLCommand(sql, db_user, inst.hostname,
inst.port, maintenance_mode=mode)
if status == 0 and output:
logger.debug("Successfully obtain hadr user info string.")
return output
if ignore_res:
return
logger.debug("Failed obtain hadr user info string.")
raise Exception(ErrorCode.GAUSS_516["GAUSS_51632"] % "obtain hadr user info")
@staticmethod
def getstatusoutput_hide_pass(joint_cmd):
"""
Hide password of process
"""
proc = Popen(["sh", "-"], stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
stdout, stderr = proc.communicate(joint_cmd)
text = stderr or stdout
sts = proc.returncode
if sts is None:
sts = 0
if text and text[-1:] == '\n':
text = text[:-1]
return sts, text
@staticmethod
def decrypt_hadr_user_info(params):
"""
Decrypt hadr user info
"""
if len(params) != 6:
raise Exception(ErrorCode.GAUSS_500["GAUSS_50000"] % "decrypt hadr user info")
rand_pwd, hadr_str, cluster_info, db_user, logger, mode = params
sql = "select pg_catalog.gs_decrypt_aes128('%s', '%s');" % (hadr_str, rand_pwd)
instances = []
for node in cluster_info.dbNodes:
if cluster_info.isSingleInstCluster():
for inst in node.datanodes:
instances.append(inst)
else:
for inst in node.coordinators:
instances.append(inst)
for inst in instances:
logger.debug("Decrypt hadr user info on node:%s with port:%s."
% (inst.hostname, inst.port))
status, output = ClusterCommand.remoteSQLCommand(sql, db_user, inst.hostname,
inst.port, maintenance_mode=mode)
if status == 0 and output and "|" in output and len(output.split("|")) == 2:
logger.debug("Successfully decrypt hadr user info string.")
hadr_user, hadr_pwd = output.strip().split("|")[0], output.strip().split("|")[1]
return hadr_user, hadr_pwd
logger.debug("Failed decrypt hadr user info string.")
raise Exception(ErrorCode.GAUSS_516["GAUSS_51632"] % "decrypt hadr user info")
@staticmethod
def decrypt_hadr_rand_pwd(logger):
"""
Decrypt hadr rand pwd
"""
db_user = pwd.getpwuid(os.getuid()).pw_name
gauss_home = ClusterDir.getInstallDir(db_user)
bin_path = os.path.join(os.path.realpath(gauss_home), "bin")
if not bin_path:
logger.debug("Failed obtain bin path.")
raise Exception(ErrorCode.GAUSS_518["GAUSS_51802"] % "bin path")
cipher_file = os.path.join(EnvUtil.getTmpDirFromEnv(), "binary_upgrade/hadr.key.cipher")
rand_file = os.path.join(EnvUtil.getTmpDirFromEnv(), "binary_upgrade/hadr.key.rand")
if os.path.isfile(cipher_file) and os.path.isfile(rand_file):
bin_path = os.path.join(EnvUtil.getTmpDirFromEnv(), "binary_upgrade")
rand_pwd = AesCbcUtil.aes_cbc_decrypt_with_path(bin_path, bin_path, key_name="hadr")
if rand_pwd:
logger.debug("Successfully decrypt rand pwd.")
return rand_pwd
@staticmethod
def get_proc_title(pwd_para_name):
"""
Obtain the process name after sensitive information is hidden.
"""
cmd = "cat /proc/%s/cmdline" % os.getpid()
status, output = CmdUtil.retryGetstatusoutput(cmd)
if status != 0 or not output:
raise Exception(ErrorCode.GAUSS_502["GAUSS_50219"] % "proc title" + " Cmd is:%s." % cmd)
title_str_list = []
for title_str in output.split("\0"):
if "=" in title_str:
title_str_list.extend(title_str.split("="))
else:
title_str_list.extend(title_str.split(" "))
if pwd_para_name in title_str_list:
w_index = title_str_list.index(pwd_para_name)
title_str_list[w_index], title_str_list[w_index + 1] = "", ""
title_name = " ".join(title_str_list).strip()
return title_name
@staticmethod
def set_proc_title(name):
"""
set proc title to new name
"""
new_name = name.encode('ascii', 'replace')
try:
libc = ctypes.CDLL('libc.so.6')
proc_name = ctypes.c_char_p.in_dll(libc, '__progname_full')
with open('/proc/self/cmdline') as fp:
old_progname_len = len(fp.readline())
if old_progname_len > len(new_name):
# padding blank chars
new_name += b' ' * (old_progname_len - len(new_name))
# Environment variables are already copied to Python app zone.
# We can get environment variables by `os.environ` module,
# so we can ignore the destroying from the following action.
libc.strcpy(proc_name, ctypes.c_char_p(new_name))
buff = ctypes.create_string_buffer(len(new_name) + 1)
buff.value = new_name
libc.prctl(15, ctypes.byref(buff), 0, 0, 0)
except Exception as err_msg:
raise Exception(ErrorCode.GAUSS_505["GAUSS_50503"] + str(err_msg))
@staticmethod
def check_is_streaming_dr_cluster():
"""check_is_steaming_cluster_cluster"""
stream_file = os.path.realpath(os.path.join(EnvUtil.getEnv("PGHOST"), "streaming_cabin"))
if os.path.exists(stream_file):
sys.exit(ErrorCode.GAUSS_512["GAUSS_51244"] % "current operate on dr cluster")
@staticmethod
def get_primary_dn_instance_id(inst_status="Primary", ignore=False):
"""
function: get Primary/Standby dn instance id for centralized/distribute cluster
:param: inst_status Primary/Standby
return; instance id
"""
cmd = r"source %s; cm_ctl query -v | grep -E 'instance_state\ *:\ %s' " \
r"-B 4 | grep -E 'type\ *:\ Datanode' -B 5 | grep instance_id | awk " \
r"'{print $NF}'" % (EnvUtil.getMpprcFile(), inst_status)
(status, output) = CmdUtil.retryGetstatusoutput(cmd)
if status != 0 or not output:
if ignore is True:
return []
raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] %
cmd + " Error: \n%s" % output)
return output.strip().split('\n')
@staticmethod
def isgreyUpgradeNodeSpecify(user, step=-1, nodes=None, logger=None):
"""
@ -2988,6 +3366,29 @@ class ClusterCommand():
# rollback to flag of start cluster
INSTALL_STEP_START = "Start cluster"
@staticmethod
def getStartCmd(nodeId=0, timeout=DefaultValue.TIMEOUT_CLUSTER_START, datadir="", azName = ""):
"""
function : Start all cluster or a node
input : String,int,String,String
output : String
"""
user_profile = EnvUtil.getMpprcFile()
cmd = "%s %s ; cm_ctl start" % (CmdUtil.SOURCE_CMD, user_profile)
# check node id
if nodeId > 0:
cmd += " -n %d" % nodeId
# check data directory
if datadir != "":
cmd += " -D %s" % datadir
# check timeout
if timeout > 0:
cmd += " -t %d" % timeout
# azName
if azName != "":
cmd += " -z%s" % azName
return cmd
@staticmethod
def getStopCmd(nodeId=0, stopMode="", timeout=0, datadir="", azName = ""):
@ -3152,7 +3553,8 @@ class ClusterCommand():
@staticmethod
def remoteSQLCommand(sql, user, host, port, ignoreError=True,
database="postgres", useTid=False,
IsInplaceUpgrade=False):
IsInplaceUpgrade=False, maintenance_mode=False,
user_name="", user_pwd=""):
"""
function : Execute sql command on remote host
input : String,String,String,int
@ -3220,7 +3622,10 @@ class ClusterCommand():
gsql_cmd = SqlCommands.getSQLCommandForInplaceUpgradeBackup(
port, database)
else:
gsql_cmd = SqlCommands.getSQLCommand(port, database)
gsql_cmd = SqlCommands.getSQLCommand(port, database, user_name=user_name,
user_pwd=user_pwd)
if maintenance_mode:
gsql_cmd += " -m "
if str(localHost) != str(host):
sshCmd = CmdUtil.getSshCmd(host)
if os.getuid() == 0 and user != "":
@ -3233,16 +3638,24 @@ class ClusterCommand():
if ignoreError:
cmd += " 2>/dev/null"
else:
cmd = "%s '" % sshCmd
cmd = ""
if mpprcFile != "" and mpprcFile is not None:
cmd += "source %s;" % mpprcFile
cmd += "%s -f %s --output %s -t -A -X '" % (gsql_cmd,
cmd += "%s -f %s --output %s -t -A -X " % (gsql_cmd,
sqlFile,
queryResultFile)
if user_pwd:
cmd = "echo \"%s\" | %s" % (cmd, sshCmd)
else:
cmd = "%s '%s'" % (sshCmd, cmd)
if ignoreError:
cmd += " 2>/dev/null"
for i in range(RE_TIMES):
(status1, output1) = subprocess.getstatusoutput(cmd)
proc = FastPopen(cmd, stdout=PIPE, stderr=PIPE,
preexec_fn=os.setsid, close_fds=True)
stdout, stderr = proc.communicate()
output1 = stdout + stderr
status1 = proc.returncode
if SqlFile.findErrorInSqlFile(sqlFile, output1):
if SqlFile.findTupleErrorInSqlFile(output1):
time.sleep(1) # find tuple error --> retry
@ -3278,7 +3691,11 @@ class ClusterCommand():
if (ignoreError):
cmd += " 2>/dev/null"
for i in range(RE_TIMES):
(status1, output1) = subprocess.getstatusoutput(cmd)
proc = FastPopen(cmd, stdout=PIPE, stderr=PIPE,
preexec_fn=os.setsid, close_fds=True)
stdout, stderr = proc.communicate()
output1 = stdout + stderr
status1 = proc.returncode
if SqlFile.findErrorInSqlFile(sqlFile, output1):
if SqlFile.findTupleErrorInSqlFile(output1):
time.sleep(1) # find tuple error --> retry
@ -3778,6 +4195,83 @@ class ClusterInstanceConfig():
return connInfo1, nodename
@staticmethod
def get_data_from_dcc(cluster_info, logger, user, paralist):
"""
function: get value from dcc
:param cluster_info: cluster info
:param logger: logger obj
:param user: cluster user
:param paralist: paralist
:return: key-value map dict
"""
gausshome = ClusterDir.getInstallDir(user)
cm_ctl = os.path.realpath(os.path.join(gausshome, "bin/cm_ctl"))
if not os.path.isfile(cm_ctl):
raise Exception(ErrorCode.GAUSS_502["GAUSS-50201"] % "file cm_ctl")
cms_count = 0
etcd_count = 0
for dbnode in cluster_info.dbNodes:
for _ in dbnode.cmservers:
cms_count += 1
for _ in dbnode.etcds:
etcd_count += 1
if cms_count == 0 or etcd_count > 1:
raise Exception(ErrorCode.GAUSS_500["GAUSS-50011"] % paralist)
para_value_map = {}
for para_key in paralist:
cmd = "source %s; %s ddb --get '%s'" % (EnvUtil.getMpprcFile(), cm_ctl, para_key)
logger.debug("Get dcc value cmd:%s." % cmd)
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd, "Error:%s" % output)
logger.debug("Get dcc value:%s." % output)
res = output.strip("\n").split("\n")
if len(res) != 2:
raise Exception(ErrorCode.GAUSS_500["GAUSS-50019"] % res)
if res[-1].find("Key not found") > -1:
para_value_map[para_key] = ""
continue
para_value_map[para_key] = res[-1].split(":")[-1].strip()
logger.debug("Get all values from dcc component res:%s." % para_value_map)
return para_value_map
@staticmethod
def set_data_on_dcc(cluster_info, logger, user, paradict):
"""
function: set data on dcc
:param cluster_info: cluster info
:param logger: logger obj
:param user: cluster user
:param paradict: paradict
:return: NA
"""
gausshome = ClusterDir.getInstallDir(user)
cm_ctl = os.path.realpath(os.path.join(gausshome, "bin/cm_ctl"))
if not os.path.isfile(cm_ctl):
raise Exception(ErrorCode.GAUSS_502["GAUSS-50201"] % "file cm_ctl")
cms_count = 0
etcd_count = 0
for dbnode in cluster_info.dbNodes:
for _ in dbnode.cmservers:
cms_count += 1
for _ in dbnode.etcds:
etcd_count += 1
if cms_count == 0 or etcd_count > 1:
raise Exception(ErrorCode.GAUSS_500["GAUSS-50011"] % paradict)
for para_key in list(paradict.keys()):
cmd = "source %s; %s ddb --put '%s' '%s'" % \
(EnvUtil.getMpprcFile(), cm_ctl, para_key, paradict[para_key])
logger.debug("Set dcc value cmd:%s." % cmd)
(status, output) = subprocess.getstatusoutput(cmd)
if status != 0:
raise Exception(ErrorCode.GAUSS_514["GAUSS_51400"] % cmd, "Error:%s" % output)
logger.debug("Set dcc data:%s." % output)
res = output.strip("\n").split("\n")
if len(res) != 2:
raise Exception(ErrorCode.GAUSS_500["GAUSS-50019"] % res)
logger.debug("Successfully set the dcc data information.")
class TempfileManagement():
"""