add logic about read uwal_config from xml

This commit is contained in:
whytao
2023-12-26 14:08:03 +08:00
parent 73f0ca11cb
commit 94a981e0ce
4 changed files with 107 additions and 0 deletions

View File

@ -133,6 +133,10 @@ from base_utils.os.user_util import UserUtil
noPassIPs = []
g_lock = thread.allocate_lock()
# uwal num
BASE_ID_GTM = 1001
BASE_ID_DATANODE = 6001
def check_content_key(content, key):
if not (type(content) == bytes):
@ -3973,6 +3977,20 @@ class ClusterInstanceConfig():
(mpprcFile, datadir, gucstr)
DefaultValue.retry_gs_guc(cmd)
@staticmethod
def setUwalRepConninfo(dbInst, connInfo):
"""
function: Construct uwal replconninfo
input: dbInst, connInfo
output: connInfo
"""
remotenodeid = int(dbInst.instanceId) - BASE_ID_DATANODE
remoteuwalhost = dbInst.uwal_ip
remoteuwalport = dbInst.port + BASE_ID_GTM
connInfo += " remotenodeid=%d remoteuwalhost=%s remoteuwalport=%d" % \
(remotenodeid, remoteuwalhost, remoteuwalport)
return connInfo
@staticmethod
def setReplConninfo(dbInst, peerInsts, clusterInfo):
"""
@ -4050,6 +4068,8 @@ class ClusterInstanceConfig():
standbyInst.haIps[i],
standbyInst.haPort, (standbyInst.port +
standbyDataNum * 4))
if standbyInst.uwal_ip != "":
connInfo1 = ClusterInstanceConfig.setUwalRepConninfo(standbyInst, connInfo1)
if dummyStandbyInst is not None:
if (i > 0):
connInfo2 += ","
@ -4061,6 +4081,8 @@ class ClusterInstanceConfig():
dummyStandbyInst.haIps[i],
dummyStandbyInst.haPort,
(dummyStandbyInst.port + dummyDataNum * 4))
if dummyStandbyInst.uwal_ip != "":
connInfo2 = ClusterInstanceConfig.setUwalRepConninfo(dummyStandbyInst, connInfo2)
elif dbInst.instanceType == DefaultValue.STANDBY_INSTANCE:
if i > 0:
connInfo1 += ","
@ -4071,6 +4093,8 @@ class ClusterInstanceConfig():
(dbInst.port + standbyDataNum * 4),
masterInst.haIps[i], masterInst.haPort,
(masterInst.port + masterDataNum * 4))
if masterInst.uwal_ip != "":
connInfo1 = ClusterInstanceConfig.setUwalRepConninfo(masterInst, connInfo1)
if (dummyStandbyInst is not None):
if i > 0:
connInfo2 += ","
@ -4082,6 +4106,8 @@ class ClusterInstanceConfig():
dummyStandbyInst.haIps[i],
dummyStandbyInst.haPort,
(dummyStandbyInst.port + dummyDataNum * 4))
if dummyStandbyInst.uwal_ip != "":
connInfo2 = ClusterInstanceConfig.setUwalRepConninfo(dummyStandbyInst, connInfo2)
elif (dbInst.instanceType == DefaultValue.DUMMY_STANDBY_INSTANCE):
if i > 0:
connInfo1 += ","
@ -4092,6 +4118,8 @@ class ClusterInstanceConfig():
masterInst.haIps[i],
masterInst.haPort,
(masterInst.port + masterDataNum * 4))
if masterInst.uwal_ip != "":
connInfo1 = ClusterInstanceConfig.setUwalRepConninfo(masterInst, connInfo1)
if i > 0:
connInfo2 += ","
connInfo2 += "localhost=%s localport=%d " \
@ -4101,6 +4129,8 @@ class ClusterInstanceConfig():
(dbInst.port + dummyDataNum * 4),
standbyInst.haIps[i], standbyInst.haPort,
(standbyInst.port + standbyDataNum * 4))
if standbyInst.uwal_ip != "":
connInfo2 = ClusterInstanceConfig.setUwalRepConninfo(standbyInst, connInfo2)
return connInfo1, connInfo2, dummyStandbyInst, nodename
@ -4187,6 +4217,8 @@ class ClusterInstanceConfig():
(dbInst.port + 4), pj.haIps[i],
pj.haPort, pj.port + 5,
pj.port + 4)
if pj.uwal_ip != "":
chanalInfo = ClusterInstanceConfig.setUwalRepConninfo(pj, chanalInfo)
if pj.instanceType == DefaultValue.CASCADE_STANDBY:
chanalInfo += " iscascade=true"
@ -4211,6 +4243,8 @@ class ClusterInstanceConfig():
(dbInst.port + 4), pj.haIps[i],
pj.haPort, pj.port + 5,
(pj.port + 4))
if pj.uwal_ip != "":
chanalInfo = ClusterInstanceConfig.setUwalRepConninfo(pj, chanalInfo)
if pj.instanceType == DefaultValue.CASCADE_STANDBY:
chanalInfo += " iscascade=true"
connInfo1.append(chanalInfo)

View File

@ -618,6 +618,8 @@ class instanceInfo():
self.cascadeRole = "off"
# dcf_data_path
self.dcf_data_path = ""
# uwal_ip
self.uwal_ip = ""
def __cmp__(self, target):
"""
@ -716,6 +718,8 @@ class dbNodeInfo():
self.standbyDnNum = 0
self.dummyStandbyDnNum = 0
self.cascadeRole = "off"
# enable_uwal
self.enable_uwal = ""
def __cmp__(self, target):
"""
@ -813,6 +817,8 @@ class dbNodeInfo():
dbInst.haIps = self.backIps[:]
else:
dbInst.haIps = haIps[:]
if self.enable_uwal == "on":
dbInst.uwal_ip = self.backIps[-1]
# cm_server
if (instRole == INSTANCE_ROLE_CMSERVER):
dbInst.datadir = os.path.join(self.cmDataDir, "cm_server")
@ -996,6 +1002,15 @@ class dbClusterInfo():
self.remote_stream_ip_map = []
self.remote_dn_base_port = 0
self.local_dn_base_port = 0
# add for uwal
self.enable_uwal = ""
self.uwal_disk_size = ""
self.uwal_devices_path = ""
self.uwal_log_path = ""
self.uwal_rpc_compression_switch = ""
self.uwal_rpc_flowcontrol_switch = ""
self.uwal_rpc_flowcontrol_value = ""
self.uwal_async_append_switch = ""
def __str__(self):
"""
@ -1042,6 +1057,25 @@ class dbClusterInfo():
DssSimpleChecker.check_dss_some_param(self)
def init_uwal_config(self, xml_entiy):
'''
init uwal input parameter
'''
_, self.uwal_disk_size = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_disk_size", "cluster")
_, self.uwal_devices_path = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_devices_path", "cluster")
_, self.uwal_log_path = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_log_path", "cluster")
_, self.uwal_rpc_compression_switch = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_rpc_compression_switch", "cluster")
_, self.uwal_rpc_flowcontrol_switch = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_rpc_flowcontrol_switch", "cluster")
_, self.uwal_rpc_flowcontrol_value = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_rpc_flowcontrol_value", "cluster")
_, self.uwal_async_append_switch = ClusterConfigFile.readOneClusterConfigItem(
xml_entiy, "uwal_async_append_switch", "cluster")
def check_conf_cm_state(self):
"""
@ -3034,6 +3068,17 @@ class dbClusterInfo():
raise Exception(ErrorCode.GAUSS_502["GAUSS_50204"] % \
"cluster network type" + " Error: \n%s" % retValue)
# Read enable_uwal
_, self.enable_uwal = ClusterConfigFile.readOneClusterConfigItem(
xmlRootNode, "enable_uwal", "cluster")
if self.enable_uwal not in ['', 'on', 'off']:
raise Exception(ErrorCode.GAUSS_500["GAUSS_50011"] %
('enable_uwal', self.enable_uwal))
if self.enable_uwal == 'on':
self.init_uwal_config(xml_entiy=xmlRootNode)
if "HOST_IP" in os.environ.keys():
self.corePath = self.__read_and_check_config_item(xmlRootNode, "corePath",
"cluster", True)
@ -3266,6 +3311,8 @@ class dbClusterInfo():
# Get DB number
dbNode.dataNum = self.__readNodeIntValue(dbNode.name, "dataNum", True,
0)
# Get enable_uwal
dbNode.enable_uwal = self.enable_uwal
# read cm directory for server and agent
try:
dbNode.cmDataDir = self.__readNodeStrValue(dbNode.name, "cmDir")

View File

@ -45,6 +45,9 @@ STANDBY_INSTANCE = 1
DUMMY_STANDBY_INSTANCE = 2
CASCADE_STANDBY_INSTANCE = 3
# uwal num
BASE_ID_GTM = 1001
BASE_ID_DATANODE = 6001
class DN_OLAP(Kernel):
'''
@ -241,6 +244,11 @@ class DN_OLAP(Kernel):
tmp_dn_dict["enable_data_replicate"] = "off"
tmp_dn_dict["replication_type"] = "1"
tmp_dn_dict["max_wal_senders"] = "16"
if self.instInfo.uwal_ip != "":
uwal_id = int(self.instInfo.instanceId) - BASE_ID_DATANODE
uwal_ip = self.instInfo.uwal_ip
uwal_port = self.instInfo.port + BASE_ID_GTM
tmp_dn_dict["uwal_config"] = "'{\\\"uwal_nodeid\\\": %d, \\\"uwal_ip\\\": \\\"%s\\\", \\\"uwal_port\\\": %d}'" % (uwal_id, uwal_ip, uwal_port)
totalnum = len(peerInsts)
for inst in peerInsts:
if inst.instanceType == CASCADE_STANDBY_INSTANCE:

View File

@ -415,6 +415,24 @@ class InstallImplOLAP(InstallImpl):
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"dcf_config=" + self.context.clusterInfo.dcf_config.replace('"', '\\"'))
cmd_param += "*==SYMBOL==*-X*==SYMBOL==*%s" % (self.context.xmlFile)
if self.context.clusterInfo.enable_uwal == 'on':
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"enable_uwal=" + self.context.clusterInfo.enable_uwal)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_disk_size=" + self.context.clusterInfo.uwal_disk_size)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_devices_path=" + self.context.clusterInfo.uwal_devices_path)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_log_path=" + self.context.clusterInfo.uwal_log_path)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_rpc_compression_switch=" + self.context.clusterInfo.uwal_rpc_compression_switch)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_rpc_flowcontrol_switch=" + self.context.clusterInfo.uwal_rpc_flowcontrol_switch)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_rpc_flowcontrol_value=" + self.context.clusterInfo.uwal_rpc_flowcontrol_value)
cmd_param += "*==SYMBOL==*-D*==SYMBOL==*%s" % (
"uwal_async_append_switch=" + self.context.clusterInfo.uwal_async_append_switch)
# create tmp file for guc parameters
# comm_max_datanode and max_process_memory
self.context.logger.debug("create tmp_guc file.")