TPCDS before query4 9335 8113 8070 8070 query13 3104 1386 1385 1385 query18 1704 1216 1151 1151 query48 840 840 839 839 query61 435 379 383 379 query71 715 570 579 570 query85 2822 2627 2612 2612 query88 1897 1816 1793 1793 Total cold run time: 20852 ms Total hot run time: 16799 ms after: query4 9610 8287 8249 8249 query13 1721 1013 1042 1013 query18 1585 1186 1155 1155 query48 789 777 778 777 query61 384 387 381 381 query71 713 610 584 584 query85 2020 1867 1843 1843 query88 1859 1812 1805 1805 Total cold run time: 18681 ms Total hot run time: 15807 ms
317 lines
9.8 KiB
Python
317 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import subprocess
|
|
import json
|
|
import requests
|
|
import os
|
|
import re
|
|
import argparse
|
|
|
|
FE_HOST = "http://127.0.0.1:8030"
|
|
MYSQL = "mysql -h:: -P9030 -u root "
|
|
DB = "tpcds_sf100"
|
|
WORK_DIR="./tmp/"
|
|
|
|
|
|
def execute_command(cmd: str):
|
|
result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
|
return result.stdout
|
|
|
|
def sql(query):
|
|
print("exec sql: " + query)
|
|
cmd = MYSQL + " -D " + DB + " -e \"" + query + "\""
|
|
result = execute_command(cmd)
|
|
return result
|
|
|
|
def last_query_id():
|
|
# 'YWRtaW46' is the base64 encoded result for 'admin:'
|
|
headers = {'Authorization': 'BASIC YWRtaW46'}
|
|
query_info_url = FE_HOST + '/rest/v2/manager/query/query_info'
|
|
resp_wrapper = requests.get(query_info_url, headers=headers)
|
|
resp_text = resp_wrapper.text
|
|
qinfo = json.loads(resp_text)
|
|
assert qinfo['msg'] == 'success'
|
|
assert len(qinfo['data']['rows']) > 0
|
|
return qinfo['data']['rows'][0][0]
|
|
|
|
def get_profile(query_id):
|
|
profile_url = FE_HOST + "/rest/v2/manager/query/profile/json/" + query_id
|
|
headers = {'Authorization': 'BASIC YWRtaW46'}
|
|
resp_wrapper = requests.get(profile_url, headers=headers)
|
|
resp_text = resp_wrapper.text
|
|
|
|
prof = json.loads(resp_text)
|
|
assert prof['msg'] == 'success', "query failed"
|
|
data = prof['data']['profile'].replace("\\n", "")
|
|
return data.replace("\\u003d", "=")
|
|
|
|
def get_fragments(obj):
|
|
return obj['children'][2]['children'][0]['children']
|
|
|
|
|
|
def get_children(obj):
|
|
return obj['children']
|
|
|
|
class Fragment:
|
|
def __init__(self, obj):
|
|
self.name = obj['name']
|
|
self.instances = []
|
|
for child in get_children(obj):
|
|
self.instances.append(Instance(child))
|
|
# the merged instance. merge corresponding Node.rows
|
|
self.merged = Instance(get_children(obj)[0])
|
|
self.merged.clear()
|
|
self.merge_instances()
|
|
|
|
m=re.search("Instance ([\w|\d]+-[\w|\d]+) ", self.merged.name)
|
|
assert m != None, "cannot find instance id: " + self.merged.name
|
|
self.first_inst_id = m.group(1)
|
|
|
|
|
|
def merge_instances(self):
|
|
for inst in self.instances:
|
|
self.merged.merge(inst)
|
|
|
|
def register(self, node_map):
|
|
self.merged.register(node_map)
|
|
|
|
class Instance:
|
|
def __init__(self, obj):
|
|
self.name = obj['name']
|
|
assert(len(get_children(obj)) > 1)
|
|
self.sender = Node(get_children(obj)[0])
|
|
self.dst_id = self.sender.get_dst_id()
|
|
|
|
self.root = Node(get_children(obj)[1])
|
|
if (len(get_children(obj)) > 2):
|
|
self.pipe = Node(get_children(obj)[2])
|
|
|
|
def clear(self):
|
|
self.root.clear()
|
|
|
|
def merge(self, other):
|
|
self.root.merge(other.root)
|
|
|
|
def register(self, node_map):
|
|
self.root.register(node_map)
|
|
|
|
class Node:
|
|
def __init__(self, obj):
|
|
self.obj = obj
|
|
self.name = self.get_name(obj)
|
|
if self.name == "RuntimeFilter":
|
|
self.rf_id = self.get_rf_id(obj)
|
|
self.id = -1
|
|
self.caption = self.name + "_" + str(self.rf_id)
|
|
else:
|
|
self.id = self.get_id(obj)
|
|
self.caption = self.name + "_" + str(self.id)
|
|
|
|
self.rows = obj['rowsReturned']
|
|
self.total_time = obj['totalTime']
|
|
self.children = []
|
|
for child in obj['children']:
|
|
self.children.append(Node(child))
|
|
|
|
def get_name(self, obj):
|
|
name = obj['name']
|
|
m=re.search("(\S+)", obj['name'])
|
|
if m != None:
|
|
name = m.group(1)
|
|
return name.replace("(", '_').replace(")", '').replace(':', '')
|
|
|
|
def get_id(self, obj):
|
|
m=re.search("id=([\d]+)", self.obj['name'])
|
|
if m != None:
|
|
return int(m.group(1))
|
|
else:
|
|
return -1
|
|
|
|
def get_rf_id(self, obj):
|
|
m=re.search("id = ([\d]+)", self.obj['name'])
|
|
if m != None:
|
|
return int(m.group(1))
|
|
else:
|
|
return -1
|
|
|
|
def get_dst_id(self):
|
|
m=re.search("dst_id=([\d]+)", self.obj['name'])
|
|
if m != None:
|
|
return int(m.group(1))
|
|
else:
|
|
return -1
|
|
|
|
def clear(self):
|
|
self.rows = 0
|
|
self.total_time = ""
|
|
for child in self.children:
|
|
child.clear()
|
|
|
|
def register(self, node_map):
|
|
node_map[self.id] = self
|
|
for child in self.children:
|
|
child.register(node_map)
|
|
|
|
def merge(self, other):
|
|
assert(self.name == other.name)
|
|
self.rows += other.rows
|
|
self.total_time += "/" + other.total_time
|
|
childCount =len(self.children)
|
|
if self.name.find("ScanNode") > 0:
|
|
return
|
|
assert(childCount == len(other.children))
|
|
for i in range(childCount):
|
|
self.children[i].merge(other.children[i])
|
|
|
|
def tree(self, prefix):
|
|
print(prefix + self.name + " id=" + str(self.id) + " rows=" + str(self.rows))
|
|
for child in self.children:
|
|
child.tree(prefix + "--")
|
|
|
|
def to_dot(self):
|
|
content = ""
|
|
list = self.children
|
|
if list != None:
|
|
list.reverse()
|
|
for child in list:
|
|
if child.id != -1:
|
|
content += "{}->{}[label={}]\n".format(child.caption, self.caption, child.rows)
|
|
for child in list:
|
|
if child.id != -1:
|
|
content += child.to_dot()
|
|
return content
|
|
|
|
def keep_alpha_numeric(input_str):
|
|
# 使用正则表达式只保留大写字母、小写字母和数字
|
|
pattern = r'[^A-Za-z0-9]'
|
|
result = re.sub(pattern, '', input_str)
|
|
return result
|
|
|
|
def to_dot_file(root, dot_file, title):
|
|
title = keep_alpha_numeric(title)
|
|
header = """
|
|
digraph BPMN {
|
|
rankdir=BT;
|
|
node[style="rounded,filled" color="lightgoldenrodyellow" ]
|
|
"""
|
|
label = "label = \"" + title + "\"\n"
|
|
tail = "}"
|
|
with open(dot_file, 'w') as f:
|
|
f.write(header)
|
|
f.write(label)
|
|
f.write(root.to_dot())
|
|
f.write(tail)
|
|
|
|
def draw_proile(query_id, title=""):
|
|
print("query_id: " + query_id)
|
|
prof = get_profile(query_id)
|
|
if title == "":
|
|
title = query_id
|
|
prf_file = WORK_DIR + title + ".profile"
|
|
png_file = WORK_DIR + title + ".png"
|
|
dot_file = WORK_DIR + title + ".dot"
|
|
with open(prf_file, "w") as f:
|
|
f.write(prof)
|
|
obj = json.loads(prof.replace("\n", ""))
|
|
fragments = []
|
|
node_map={}
|
|
for json_fr in get_fragments(obj):
|
|
fr = Fragment(json_fr)
|
|
fr.register(node_map)
|
|
fragments.append(fr)
|
|
root_fr = fragments[0]
|
|
for fr in fragments[1:]:
|
|
exchange = node_map[fr.merged.dst_id]
|
|
exchange.children.append(fr.merged.root)
|
|
|
|
root_fr.merged.root.tree("")
|
|
|
|
to_dot_file(root_fr.merged.root, dot_file, title)
|
|
cmd = "dot {} -T png -o {}".format(dot_file, png_file)
|
|
print(cmd)
|
|
status = os.system(cmd)
|
|
if status != 0:
|
|
print("convert to png failed")
|
|
else:
|
|
print("generate " + png_file)
|
|
|
|
def exec_sql_and_draw(sql_file):
|
|
if not os.path.isdir(WORK_DIR):
|
|
os.mkdir(WORK_DIR)
|
|
with open(sql_file, 'r') as f:
|
|
#trim sql comments
|
|
query = ""
|
|
for line in f.readlines():
|
|
pos = line.find("--")
|
|
if pos == -1:
|
|
query += line
|
|
else:
|
|
query += line[:pos]
|
|
sql("set global enable_nereids_planner=true")
|
|
sql("set global enable_pipeline_engine=true")
|
|
sql("set global enable_profile=true")
|
|
sql("set global runtime_filter_type=2")
|
|
sql("set global enable_runtime_filter_prune=false")
|
|
sql("set global runtime_filter_wait_time_ms=9000000")
|
|
sql(query)
|
|
query_id = last_query_id()
|
|
return query_id
|
|
|
|
|
|
def print_usage():
|
|
print("""
|
|
USAGE:
|
|
1. execute a sql file, and draw its profile
|
|
python3 profile_viewer.py -f [path to sql file]
|
|
2. draw a given profile
|
|
python3 profile_viewer.py -qid [query_id]
|
|
|
|
graphviz is required(https://graphviz.org/)
|
|
on linux: apt install graphviz
|
|
on mac: brew install graphviz
|
|
""")
|
|
|
|
if __name__ == '__main__':
|
|
USAGE ="""
|
|
1. execute a sql file, and draw its profile
|
|
python3 profile_viewer.py -f [path to sql file] -t [png title]
|
|
2. draw a given profile
|
|
python3 profile_viewer.py -qid [query_id] -t [png title]
|
|
|
|
graphviz is required(https://graphviz.org/)
|
|
on linux: apt install graphviz
|
|
on mac: brew install graphviz
|
|
"""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-f", "--file", help="sql file", dest="sql_file", type=str, default ="")
|
|
parser.add_argument("-qid", "--query_id", help="query id", dest="query_id", type=str, default="")
|
|
parser.add_argument("-t", "--title", help="query title", dest="title", type=str, default="")
|
|
|
|
args = parser.parse_args()
|
|
if args.sql_file == "" and args.query_id == "":
|
|
print(USAGE)
|
|
exit(0)
|
|
title = args.title
|
|
if args.query_id == "":
|
|
query_id = exec_sql_and_draw(args.sql_file)
|
|
draw_proile(query_id, title)
|
|
else:
|
|
draw_proile(args.query_id, title)
|
|
|