#!/usr/bin/env python # -*- coding: utf-8 -*- """ Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import re import time import os import sys import MySQLdb def insert_func(host, port, user, password, database, select_sql, insert_sql): """ insert into doris table select xxx :param host: :param port: :param user: :param password: :param database: :param select_sql: SELECT column1, column2,..., columnN [FROM TABLE_X WHERE xxx] :param insert_sql: INSERT INTO TABLE_Y[(column1, column2,...,columnN)] :return: """ db_conn = MySQLdb.connect(host=host, port=port, user=user, passwd=password, db=database) db_cursor = db_conn.cursor() insert_process(select_sql, insert_sql, db_cursor) def insert_process(select_sql, insert_sql, cursor): """ issue insert task and check insert task status. :param select_sql: SELECT column1, column2,..., columnN [FROM TABLE_X WHERE xxx] :param insert_sql: INSERT INTO TABLE_Y[(column1, column2,...,columnN)] :param cursor: :return: """ print insert_sql print select_sql cursor.execute(select_sql) rows = cursor.fetchall() if len(rows) == 0: print "select result is empty, don't need insert" return cursor.execute(insert_sql + select_sql) label_info = cursor._info label = re.match( r'{\'label\':\'([a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12})\'}', label_info).group(1) print label # check insert task status sql = "show load where label = '" + label + "' order by CreateTime desc limit 1" print sql cursor.execute(sql) rows = cursor.fetchall() timeout = 60 * 60 load_status = "" while timeout > 0: load_status = rows[0][2] print "insert status: " + load_status if load_status == 'FINISHED' or load_status == 'CANCELLED': break time.sleep(5) timeout = timeout - 5 cursor.execute(sql) rows = cursor.fetchall() if load_status == "CANCELLED": exit("error: insert data CANCELLED") elif load_status != "FINISHED": exit("error: insert data timout") else: print "insert success." if __name__ == '__main__': """ Befor you run this demo, you should do as below. First, you need install MySQLdb, execute cmd by root: pip install MySQL-python # if you met error: "mysql_config not found", you can execute the following cmd to solve it. ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config Second, you need input your db connect config & input insert/select sql """ insert_func("127.0.0.1", 8080, 'user', 'password', 'test_db', "SELECT column1, column2 FROM TABLE_X WHERE column1 = 'test'", "INSERT INTO TABLE_Y(column1, column2)")