Add a tool to show segment status (#2260)

In order to be aware of the convert process from AlphaRowset to BetaRowset, we need a mechanism to know the process of convert.
This commit is contained in:
LingBin
2019-11-26 21:35:16 -06:00
committed by Mingyu Chen
parent ccbd65daeb
commit 036ef5bcb9
5 changed files with 567 additions and 0 deletions

View File

@ -0,0 +1,71 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
This tool is used to get the progress of all current table transitions
during the online `segment_2` function.
Currently, you can specify 3 dimensions (in the conf file) to view the
results, you can specify one of them individually, or you can customize
the combination (that is, specify multiple at the same time).
# Note
We use MySQLdb python lib to fetch meta from FE, so you must install it.
You can get MySQLdb lib from https://pypi.python.org/pypi/MySQL-python,
then you can install it as follows:
```
$ tar zxvf MySQL-python-*.tar.gz
$ cd MySQL-python-*
$ python setup.py build
$ python setup.py install
```
# Steps
1. Fill in the conf according to your cluster configuration, and specify
the table or be you want to watch.
2. Execute `python show_segment_status.py`
# Example
1. If you want to watch the process of a table named `xxxx`, you can specify
`table_name = xxxx` in conf file
2. If you want to watch the process on be whose be_id is `xxxx`, you can specify
`be_id = xxxx` in conf file
# Output Example Format
```
==========SUMMARY()===========
rowset_count: 289845 / 289845
rowset_disk_size: 84627551189 / 84627551189
rowset_row_count: 1150899153 / 1150899153
===========================================================
==========SUMMARY(table=xxxx)===========
rowset_count: 289845 / 289845
rowset_disk_size: 84627551189 / 84627551189
rowset_row_count: 1150899153 / 1150899153
===========================================================
==========SUMMARY(be=10003 )===========
rowset_count: 79650 / 79650
rowset_disk_size: 24473921575 / 24473921575
rowset_row_count: 331449328 / 331449328
===========================================================
```

View File

@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import time
from urllib import urlopen
class BeTabletResolver:
def __init__(self, be_list, tablet_map):
self.tablet_map = tablet_map
self.tablet_infos = {}
self.be_map = {}
for be in be_list:
self.be_map[be['be_id']] = be
def debug_output(self):
print "tablet_infos:(%s), print up to ten here:" % len(self.tablet_infos)
self._print_list(self.tablet_infos.values()[0:10])
print
def _print_list(self, one_list):
for item in one_list:
print item
def init(self):
self.fetch_tablet_meta()
def fetch_tablet_meta(self):
print "fetching tablet metas from BEs..."
count = 0
for tablet in self.tablet_map.values():
be_id = tablet['be_id']
be = self.be_map[be_id]
url = self._make_url(be, tablet)
print url
tablet_meta = self._fetch_tablet_meta_by_id(url)
self._decode_rs_metas_of_tablet(tablet_meta)
# slow down, do not need too fast
count += 1
if count % 10 == 0:
time.sleep(0.005)
print "finished. \n"
return
def _make_url(self, be, tablet):
url_list = []
url_list.append("http://")
url_list.append(be["ip"])
url_list.append(":")
url_list.append(be["http_port"])
url_list.append("/api/meta/header/")
url_list.append(str(tablet["tablet_id"]))
url_list.append("/")
url_list.append(str(tablet["schema_hash"]))
return "".join(url_list)
def _fetch_tablet_meta_by_id(self, url):
tablet_meta = urlopen(url).read()
tablet_meta = json.loads(tablet_meta)
return tablet_meta
def _decode_rs_metas_of_tablet(self, tablet_meta):
# When something wrong, may do not have rs_metas attr, so use 'get()' instead of '[]'
rs_metas = tablet_meta.get('rs_metas')
if rs_metas is None:
return
size = len(rs_metas)
rowsets = []
for rs_meta in rs_metas:
rowset = {}
rowset['tablet_id'] = rs_meta['tablet_id']
rowset['num_rows'] = rs_meta['num_rows']
rowset['data_disk_size'] = rs_meta['data_disk_size']
if rs_meta['rowset_type'] == 'BETA_ROWSET':
rowset['is_beta'] = True
else:
rowset['is_beta'] = False
rowsets.append(rowset);
self.tablet_infos[rs_meta['tablet_id']] = rowsets
return
def get_rowsets_by_tablet(self, tablet_id):
return self.tablet_infos.get(tablet_id)
def get_all_rowsets(self):
return self.tablet_infos.values()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,14 @@
[cluster]
fe_host =
query_port =
user = root
query_pwd =
# Following confs are optional
# select one database
db_name =
# select one table
table_name =
# select one be. when value is 0 means all bes
be_id = 0

View File

@ -0,0 +1,245 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import MySQLdb
# NOTE: The default organization of meta info is cascading, we flatten its structure
# NOTE: We get schema-hash from proc '/dbs/db_id/tbl_id/'index_schema'
class FeMetaResolver:
def __init__(self, fe_host, query_port, user, query_pwd):
self.fe_host = fe_host
self.query_port = query_port
self.user = user
self.query_pwd = query_pwd
self.db = None
self.cur = None
self.be_list = []
self.db_list = []
# Only base tables, excluding rollups
self.table_list = []
# All rollups, including base tables
self.rollup_map = {}
self.partition_list = []
self.index_list = []
self.tablet_map = {}
def init(self):
self.connect_mysql()
self.fetch_be_list();
self.fetch_db_list();
self.fetch_table_list();
self.fetch_rollup_map();
self.fetch_partition_list();
self.fetch_idx_list();
self._merge_schema_hash_to_idx_list()
self.fetch_tablet_list();
self.close()
def connect_mysql(self):
try:
self.db = MySQLdb.connect(host=self.fe_host, port=self.query_port,
user=self.user,
passwd=self.query_pwd)
self.cur = self.db.cursor()
except MySQLdb.Error as e:
print ("Failed to connect fe server. error %s:%s" % (str(e.args[0]), e.args[1]))
exit(-1);
def exec_sql(self, sql):
try:
self.cur.execute(sql)
except MySQLdb.Error as e:
print ("exec sql error %s:%s" % (str(e.args[0]), e.args[1]))
exit(-1);
def close(self):
if self.db.open:
self.cur.close()
self.db.close()
def fetch_be_list(self):
show_be_sql = "show backends"
self.exec_sql(show_be_sql);
be_list = self.cur.fetchall()
for be_tuple in be_list :
be = {}
be['be_id'] = long(be_tuple[0])
be['ip'] = be_tuple[2]
be['http_port'] = be_tuple[5]
self.be_list.append(be)
return
def fetch_db_list(self):
show_database_sql = "show proc \"/dbs\" "
self.exec_sql(show_database_sql);
db_list = self.cur.fetchall()
for db_tuple in db_list :
db = {}
if long(db_tuple[0]) <= 0:
continue
db['db_id'] = long(db_tuple[0])
db['db_name'] = db_tuple[1]
self.db_list.append(db)
def fetch_table_list(self):
for db in self.db_list:
self._fetch_tables_by_db(db)
def _fetch_tables_by_db(self, db):
sql = "show proc \"/dbs/%s\" " % db['db_id']
self.exec_sql(sql);
table_list = self.cur.fetchall()
for table_tuple in table_list :
table = {}
table['db_id'] = db['db_id']
table['db_name'] = db['db_name']
table['tbl_id'] = long(table_tuple[0])
table['tbl_name'] = table_tuple[1]
self.table_list.append(table)
return
def fetch_rollup_map(self):
for table in self.table_list:
self._fetch_rollups_by_table(table);
def _fetch_rollups_by_table(self, table):
sql = "show proc \"/dbs/%s/%s/index_schema\" " % (table['db_id'], table['tbl_id'])
self.exec_sql(sql);
index_list = self.cur.fetchall()
for index_tuple in index_list :
index = {}
index['tbl_id'] = table['tbl_id']
index['tbl_name'] = table['tbl_name']
index['idx_id'] = long(index_tuple[0])
index['schema_hash'] = long(index_tuple[3])
self.rollup_map[index['idx_id']] = index
return
def fetch_partition_list(self):
for table in self.table_list:
self._fetch_partitions_by_table(table);
def _fetch_partitions_by_table(self, table):
sql = "show proc \"/dbs/%s/%s/partitions\" " % (table['db_id'], table['tbl_id'])
self.exec_sql(sql);
partition_list = self.cur.fetchall()
for partition_tuple in partition_list :
partition = {}
partition['db_id'] = table['db_id']
partition['db_name'] = table['db_name']
partition['tbl_id'] = table['tbl_id']
partition['tbl_name'] = table['tbl_name']
partition['partition_id'] = long(partition_tuple[0])
partition['partition_name'] = partition_tuple[1]
self.partition_list.append(partition)
return
def fetch_idx_list(self):
for partition in self.partition_list:
self._fetch_idxes_by_partition(partition);
def _fetch_idxes_by_partition(self, partition):
sql = "show proc \"/dbs/%s/%s/partitions/%s\" " % \
(partition['db_id'], partition['tbl_id'], partition['partition_id'])
self.exec_sql(sql);
index_list = self.cur.fetchall()
for idx_tuple in index_list :
idx = {}
idx['db_id'] = partition['db_id']
idx['db_name'] = partition['db_name']
idx['tbl_id'] = partition['tbl_id']
idx['tbl_name'] = partition['tbl_name']
idx['partition_id'] = partition['partition_id']
idx['partition_name'] = partition['partition_name']
idx['idx_id'] = long(idx_tuple[0])
idx['idx_name'] = idx_tuple[1]
idx['idx_state'] = idx_tuple[2]
self.index_list.append(idx)
return
def _merge_schema_hash_to_idx_list(self):
for index in self.index_list:
idx_id = index['idx_id']
rollup = self.rollup_map.get(idx_id)
index['schema_hash'] = rollup['schema_hash']
def fetch_tablet_list(self):
for index in self.index_list:
self._fetch_tablets_by_index(index);
def _fetch_tablets_by_index(self, index):
sql = "show proc \"/dbs/%s/%s/partitions/%s/%s\" " % \
(index['db_id'], index['tbl_id'], index['partition_id'], index['idx_id'])
self.exec_sql(sql);
tablet_list = self.cur.fetchall()
for tablet_tuple in tablet_list :
tablet = {}
tablet['db_id'] = index['db_id']
tablet['db_name'] = index['db_name']
tablet['tbl_id'] = index['tbl_id']
tablet['tbl_name'] = index['tbl_name']
tablet['partition_id'] = index['partition_id']
tablet['partition_name'] = index['partition_name']
tablet['idx_id'] = index['idx_id']
tablet['idx_name'] = index['idx_name']
tablet['idx_state'] = index['idx_state']
tablet['tablet_id'] = long(tablet_tuple[0])
tablet['replica_id'] = long(tablet_tuple[1])
tablet['be_id'] = long(tablet_tuple[2])
tablet['schema_hash'] = index["schema_hash"]
self.tablet_map[tablet['tablet_id']] = tablet
return
def debug_output(self):
print "be_list:"
self._print_list(self.be_list)
print
print "database_list:"
self._print_list(self.db_list)
print
print "table_list:"
self._print_list(self.table_list)
print
print "rollup_list:"
self._print_list(self.rollup_map.values())
print
print "partition_list:"
self._print_list(self.partition_list)
print
print "index_list:"
self._print_list(self.index_list)
print
print "tablet_map:(%s), print up to ten here:" % len(self.tablet_map)
self._print_list(self.tablet_map.values()[0:10])
print
def _print_list(self, one_list):
for item in one_list:
print item
def get_tablet_by_id(self, tablet_id):
return self.tablet_map.get(tablet_id)
def get_all_tablets(self):
return self.tablet_map.values()

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import ConfigParser
import re
import sys
import os
import json
from urllib import urlopen
from fe_meta_resolver import FeMetaResolver
from be_tablet_reslover import BeTabletResolver
class Calc:
def __init__(self, fe_meta, be_resolver):
self.fe_meta = fe_meta
self.be_resolver = be_resolver
def calc_cluster_summary(self):
self.calc_table_and_be_summary("", "", 0)
return
def calc_table_summary(self, db_name, table_name):
self.calc_table_and_be_summary(db_name, table_name, 0)
return
def calc_be_summary(self, be_id):
self.calc_table_and_be_summary("", "", be_id)
return
def calc_table_and_be_summary(self, db_name, table_name, be_id):
total_rs_count = 0
beta_rs_count = 0
total_rs_size = 0
beta_rs_size = 0
total_rs_row_count = 0
beta_rs_row_count = 0
for tablet in self.fe_meta.get_all_tablets():
# The db_name from meta contain cluster name, so use 'in' here
if len(db_name) != 0 and (not (db_name in tablet['db_name'])):
continue
if len(table_name) != 0 and (tablet['tbl_name'] != table_name):
continue;
if be_id != 0 and tablet['be_id'] != be_id:
continue
rowsets = self.be_resolver.get_rowsets_by_tablet(tablet['tablet_id'])
# If tablet has gone away, ignore it
if rowsets is None:
continue
for tablet_info in rowsets:
total_rs_count += 1
total_rs_row_count += tablet_info['num_rows']
total_rs_size += tablet_info['data_disk_size']
if tablet_info['is_beta']:
beta_rs_count += 1
beta_rs_size += tablet_info['data_disk_size']
beta_rs_row_count += tablet_info['num_rows']
content_str = ""
if len(db_name) != 0:
content_str += ("db=%s " % db_name)
if len(table_name) != 0:
content_str += ("table=%s " % table_name)
if be_id != 0:
content_str += ("be=%s " % be_id)
print "==========SUMMARY(%s)===========" % (content_str)
print "rowset_count: %s / %s" % (beta_rs_count, total_rs_count)
print "rowset_disk_size: %s / %s" % (beta_rs_size, total_rs_size)
print "rowset_row_count: %s / %s" % (beta_rs_row_count, total_rs_row_count)
print "==========================================================="
return;
def main():
cf = ConfigParser.ConfigParser()
cf.read("./conf")
fe_host = cf.get('cluster', 'fe_host')
query_port = int(cf.get('cluster', 'query_port'))
user = cf.get('cluster', 'user')
query_pwd = cf.get('cluster', 'query_pwd')
db_name = cf.get('cluster', 'db_name')
table_name = cf.get('cluster', 'table_name')
be_id = cf.getint('cluster', 'be_id')
print "============= CONF ============="
print "fe_host =", fe_host
print "fe_query_port =", query_port
print "user =", user
print "db_name =", db_name
print "table_name =", table_name
print "be_id =", be_id
print "===================================="
fe_meta = FeMetaResolver(fe_host, query_port, user, query_pwd)
fe_meta.init()
fe_meta.debug_output()
be_resolver = BeTabletResolver(fe_meta.be_list, fe_meta.tablet_map)
be_resolver.init()
be_resolver.debug_output()
calc = Calc(fe_meta, be_resolver)
calc.calc_cluster_summary()
calc.calc_table_summary(db_name, table_name);
calc.calc_be_summary(be_id);
if __name__ == '__main__':
main()