oceanbase/script/import/ob_import.py
wangzelin.wzl 93a1074b0c patch 4.0
2022-10-24 17:57:12 +08:00

193 lines
4.7 KiB
Python
Executable File

#!/usr/bin/env python
import os
import sys
import MySQLdb
import Queue
import threading
import multiprocessing
import traceback
import random
param = {}
param['data_file'] = sys.argv[1]
param['delima'] = '\1'
param['nop'] = '\2'
param['null'] = '\3'
param['table_name'] = 't1'
param['column_count'] = 4
param['host'] = "10.232.38.8"
param['port'] = 35999
param['user'] = "admin"
param['passwd'] = "admin"
param['batch_count'] = 1000
param['concurrency'] = 20
id_column_map = [
(0,'c1'),
(1,'c2'),
(2,'c3'),
(3,'c4'),
]
param['id_column_map'] = id_column_map
for k,v in param.items():
print k, v
class MyExcept(BaseException):
pass
def get_column_type(cursor, table_name):
column_type_map = {}
if cursor.execute("select table_id from __first_tablet_entry where table_name = '%s'" % table_name) != 1:
raise MyExcept
table_id = cursor.fetchone()[0]
if cursor.execute("select column_name, data_type from __all_all_column where table_id = %d" % table_id) <= 0:
raise MyExcept
for item in cursor.fetchall():
column_type_map[item[0]] = item[1]
return column_type_map
def add_value(count, line_num, tokens, set_values, execute_values):
for id_column in id_column_map:
id = id_column[0]
if tokens[id] == param['nop']:
execute_values.append('@nop')
elif tokens[id] == param['null']:
execute_values.append('@null')
else:
var_name = '@a%d' % count
execute_values.append(var_name)
set_values.append("%s='%s'" % (var_name, tokens[id]))
count += 1
return count
def gen_replace_sql(**param):
values = "(%s)" % ','.join([ '?' for i in xrange(0,len(id_column_map)) ])
values = ",".join([ values for i in xrange(0, param["batch_count"]) ])
column_def = ','.join([ i[1] for i in id_column_map ])
param["values"] = values
param["column_def"] = column_def
return "prepare p1 from replace into %(table_name)s(%(column_def)s) values%(values)s" % param
def worker():
conn = MySQLdb.connect(host=param["host"], port=param["port"], user=param["user"], passwd = param["passwd"])
conn.autocommit(True)
cursor = conn.cursor()
replace_sql = gen_replace_sql(**param)
cursor.execute(replace_sql)
print "work", os.getpid()
while True:
lines = q.get()
if lines == None:
break
if len(lines) != param['batch_count']:
raise MyExcept
set_values = []
execute_values = []
count = 0
for i in xrange(0, len(lines)):
line = lines[i]
tokens = line.split(param['delima'])
if len(tokens) != param['column_count']:
print tokens
raise MyExcept
count = add_value(count, i, tokens, set_values, execute_values);
set_sql = "set " + ",".join(set_values)
execute_sql = "execute p1 using " + ",".join(execute_values)
try:
cursor.execute(set_sql)
cursor.execute(execute_sql)
except:
print set_sql
print execute_sql
print "".join(traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
cursor.close()
conn.close()
def import_lines(lines):
conn = MySQLdb.connect(host=param["host"], port=param["port"], user=param["user"], passwd = param["passwd"])
conn.autocommit(True)
cursor = conn.cursor()
param['batch_count'] = len(lines)
replace_sql = gen_replace_sql(**param)
cursor.execute(replace_sql)
if len(lines) != param['batch_count']:
raise MyExcept
set_values = []
execute_values = []
count = 0
for i in xrange(0, len(lines)):
line = lines[i]
tokens = line.split(param['delima'])
if len(tokens) != param['column_count']:
print tokens
raise MyExcept
count = add_value(count, i, tokens, set_values, execute_values);
set_sql = "set " + ",".join(set_values)
execute_sql = "execute p1 using " + ",".join(execute_values)
try:
cursor.execute(set_sql)
cursor.execute(execute_sql)
except:
print set_sql
print execute_sql
print "".join(traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
cursor.close()
conn.close()
f = open(param['data_file'])
line_count = 0
q = multiprocessing.Queue(param['concurrency'] * 2)
threads = []
for i in xrange(param['concurrency']):
t = multiprocessing.Process(target=worker)
threads.append(t)
t.daemon = True
t.start()
print "main", os.getpid()
while True:
flag = False
lines = []
for i in xrange(0, param['batch_count']):
line = f.readline()
if 0 == len(line):
flag = True
break
line = line.strip()
lines.append(line)
if flag:
if len(lines) != 0:
import_lines(lines)
break
line_count += len(lines)
if line_count % 10000 == 0:
print line_count
if len(lines) == param['batch_count']:
q.put(lines, timeout = 3)
for i in xrange(0, param['concurrency']):
q.put(None)
f.close()
for t in threads:
t.join()