193 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			193 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import MySQLdb
 | |
| import Queue
 | |
| import threading 
 | |
| import multiprocessing
 | |
| import traceback
 | |
| import random
 | |
| 
 | |
| param = {}
 | |
| param['data_file'] = sys.argv[1]
 | |
| param['delima'] = '\1'
 | |
| param['nop'] = '\2'
 | |
| param['null'] = '\3'
 | |
| 
 | |
| param['table_name'] = 't1'
 | |
| param['column_count'] = 4
 | |
| param['host'] = "127.0.0.1"
 | |
| param['port'] = 35999
 | |
| param['user'] = "admin"
 | |
| param['passwd'] = "admin"
 | |
| param['batch_count'] = 1000
 | |
| param['concurrency'] = 20
 | |
| 
 | |
| id_column_map = [
 | |
|   (0,'c1'), 
 | |
|   (1,'c2'),
 | |
|   (2,'c3'),
 | |
|   (3,'c4'),
 | |
|   ]
 | |
| 
 | |
| param['id_column_map'] = id_column_map
 | |
| 
 | |
| for k,v in param.items():
 | |
|   print k, v
 | |
| 
 | |
| class MyExcept(BaseException):
 | |
|   pass
 | |
| 
 | |
| def get_column_type(cursor, table_name):
 | |
|   column_type_map = {}
 | |
|   if cursor.execute("select table_id from __first_tablet_entry where table_name = '%s'" % table_name) != 1:
 | |
|     raise MyExcept
 | |
|   table_id = cursor.fetchone()[0]
 | |
|   if cursor.execute("select column_name, data_type from __all_all_column where table_id = %d" % table_id) <= 0:
 | |
|     raise MyExcept
 | |
|   for item in cursor.fetchall():
 | |
|     column_type_map[item[0]] = item[1]
 | |
|   return column_type_map
 | |
| 
 | |
| 
 | |
| def add_value(count, line_num, tokens, set_values, execute_values):
 | |
|   for id_column in id_column_map:
 | |
|     id = id_column[0]
 | |
|     if tokens[id] == param['nop']:
 | |
|       execute_values.append('@nop')
 | |
|     elif tokens[id] == param['null']:
 | |
|       execute_values.append('@null')
 | |
|     else:
 | |
|       var_name = '@a%d' % count
 | |
|       execute_values.append(var_name)
 | |
|       set_values.append("%s='%s'" % (var_name, tokens[id]))
 | |
|       count += 1
 | |
|   return count
 | |
| 
 | |
| def gen_replace_sql(**param):
 | |
|   values = "(%s)" % ','.join([ '?' for i in xrange(0,len(id_column_map)) ])
 | |
|   values = ",".join([ values for i in xrange(0, param["batch_count"]) ])
 | |
|   column_def = ','.join([ i[1] for i in id_column_map ])
 | |
|   param["values"] = values
 | |
|   param["column_def"] = column_def
 | |
|   return "prepare p1 from replace into %(table_name)s(%(column_def)s) values%(values)s" % param
 | |
| 
 | |
| def worker():
 | |
|   conn = MySQLdb.connect(host=param["host"], port=param["port"], user=param["user"], passwd = param["passwd"])
 | |
|   conn.autocommit(True)
 | |
|   cursor = conn.cursor()
 | |
|   replace_sql = gen_replace_sql(**param)
 | |
|   cursor.execute(replace_sql)
 | |
| 
 | |
|   print "work", os.getpid()
 | |
| 
 | |
|   while True:
 | |
|     lines = q.get()
 | |
|     if lines == None:
 | |
|       break
 | |
|     if len(lines) != param['batch_count']:
 | |
|       raise MyExcept
 | |
|     set_values = []
 | |
|     execute_values = []
 | |
|     count = 0
 | |
|     for i in xrange(0, len(lines)):
 | |
|       line = lines[i]
 | |
|       tokens = line.split(param['delima'])
 | |
|       if len(tokens) != param['column_count']:
 | |
|         print tokens
 | |
|         raise MyExcept
 | |
|       count = add_value(count, i, tokens, set_values, execute_values);
 | |
|     set_sql = "set " + ",".join(set_values)
 | |
|     execute_sql = "execute p1 using " + ",".join(execute_values)
 | |
| 
 | |
|     try:
 | |
|       cursor.execute(set_sql)
 | |
|       cursor.execute(execute_sql)
 | |
|     except:
 | |
|       print set_sql
 | |
|       print execute_sql
 | |
|       print "".join(traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
 | |
| 
 | |
|   cursor.close()
 | |
|   conn.close()
 | |
| 
 | |
| def import_lines(lines):
 | |
|   conn = MySQLdb.connect(host=param["host"], port=param["port"], user=param["user"], passwd = param["passwd"])
 | |
|   conn.autocommit(True)
 | |
|   cursor = conn.cursor()
 | |
|   param['batch_count'] = len(lines)
 | |
|   replace_sql = gen_replace_sql(**param)
 | |
|   cursor.execute(replace_sql)
 | |
| 
 | |
|   if len(lines) != param['batch_count']:
 | |
|     raise MyExcept
 | |
|   set_values = []
 | |
|   execute_values = []
 | |
|   count = 0
 | |
|   for i in xrange(0, len(lines)):
 | |
|     line = lines[i]
 | |
|     tokens = line.split(param['delima'])
 | |
|     if len(tokens) != param['column_count']:
 | |
|       print tokens
 | |
|       raise MyExcept
 | |
|     count = add_value(count, i, tokens, set_values, execute_values);
 | |
|   set_sql = "set " + ",".join(set_values)
 | |
|   execute_sql = "execute p1 using " + ",".join(execute_values)
 | |
| 
 | |
|   try:
 | |
|     cursor.execute(set_sql)
 | |
|     cursor.execute(execute_sql)
 | |
|   except:
 | |
|     print set_sql
 | |
|     print execute_sql
 | |
|     print "".join(traceback.format_exception(sys.exc_type, sys.exc_value, sys.exc_traceback))
 | |
| 
 | |
|   cursor.close()
 | |
|   conn.close()
 | |
| 
 | |
| 
 | |
| 
 | |
| f = open(param['data_file'])
 | |
| line_count = 0
 | |
| q = multiprocessing.Queue(param['concurrency'] * 2)
 | |
| 
 | |
| threads = []
 | |
| for i in xrange(param['concurrency']):
 | |
|   t = multiprocessing.Process(target=worker)
 | |
|   threads.append(t)
 | |
|   t.daemon = True
 | |
|   t.start()
 | |
| 
 | |
| print "main", os.getpid()
 | |
| 
 | |
| while True:
 | |
|   flag = False
 | |
|   lines = []
 | |
|   for i in xrange(0, param['batch_count']):
 | |
|     line = f.readline()
 | |
|     if 0 == len(line):
 | |
|       flag = True
 | |
|       break
 | |
|     line = line.strip()
 | |
|     lines.append(line)
 | |
|   if flag:
 | |
|     if len(lines) != 0:
 | |
|       import_lines(lines)
 | |
|     break
 | |
|   line_count += len(lines)
 | |
|   if line_count % 10000 == 0:
 | |
|     print line_count
 | |
|   if len(lines) == param['batch_count']:
 | |
|     q.put(lines, timeout = 3)
 | |
| 
 | |
| for i in xrange(0, param['concurrency']):
 | |
|   q.put(None)
 | |
| 
 | |
| f.close()
 | |
| 
 | |
| for t in threads:
 | |
|   t.join()
 | |
| 
 | |
| 
 | 
