158 lines
5.5 KiB
Python
158 lines
5.5 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Licensed to the Apache Software Foundation (ASF) under one
|
|
or more contributor license agreements. See the NOTICE file
|
|
distributed with this work for additional information
|
|
regarding copyright ownership. The ASF licenses this file
|
|
to you under the Apache License, Version 2.0 (the
|
|
"License"); you may not use this file except in compliance
|
|
with the License. You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing,
|
|
software distributed under the License is distributed on an
|
|
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
KIND, either express or implied. See the License for the
|
|
specific language governing permissions and limitations
|
|
under the License.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
|
|
|
|
class DorisMiniLoadClient(object):
|
|
""" load file to doris """
|
|
|
|
def __init__(self, db_host, db_port, db_name,
|
|
db_user, db_password, file_name, table, load_timeout):
|
|
"""
|
|
init
|
|
:param db_host: db host
|
|
:param db_port: db port
|
|
:param db_name: db name
|
|
:param db_user: db user
|
|
:param db_password: db password
|
|
:param file_name: local file path
|
|
:param table: db table
|
|
:param load_timeout:mini load timeout, defalut 86400 seconds.
|
|
"""
|
|
self.file_name = file_name
|
|
self.table = table
|
|
self.load_host = db_host
|
|
self.load_port = db_port
|
|
self.load_database = db_name
|
|
self.load_user = db_user
|
|
self.load_password = db_password
|
|
self.load_timeout = load_timeout
|
|
|
|
def get_label(self):
|
|
"""
|
|
获取label前缀
|
|
:return: lable
|
|
"""
|
|
|
|
return '_'.join([self.table, os.path.basename(self.file_name)])
|
|
|
|
def load_doris(self):
|
|
"""
|
|
load file to doris by curl, allow 3 times to retry.
|
|
:return: mini load label
|
|
"""
|
|
retry_time = 0
|
|
label = self.get_label()
|
|
|
|
while retry_time < 3:
|
|
load_cmd = "curl"
|
|
param_location = "--location-trusted"
|
|
param_user = "%s:%s" % (self.load_user, self.load_password)
|
|
param_file = "%s" % self.file_name
|
|
param_url = "http://%s:%s/api/%s/%s/_load?label=%s&timeout=" % (self.load_host, self.load_port,
|
|
self.load_database,
|
|
self.table, label, self.load_timeout)
|
|
|
|
load_subprocess = subprocess.Popen([load_cmd, param_location,
|
|
"-u", param_user, "-T", param_file, param_url])
|
|
|
|
# Wait for child process to terminate. Returns returncode attribute
|
|
load_subprocess.wait()
|
|
|
|
# check returncode;
|
|
# If fail, retry 3 times
|
|
if load_subprocess.returncode != 0:
|
|
print """Load to doris failed! LABEL is %s, Retry time is %d """ % (label, retry_time)
|
|
retry_time += 1
|
|
# If success, print log, and break retry loop
|
|
if load_subprocess.returncode == 0:
|
|
print """Load to doris success! LABEL is %s, Retry time is %d """ % (label, retry_time)
|
|
break
|
|
|
|
return label
|
|
|
|
@classmethod
|
|
def check_load_status(cls, label, host, port, user, password, database):
|
|
"""
|
|
check async mini load process status.
|
|
:param label:mini load label
|
|
:param host: db host
|
|
:param port: db port
|
|
:param user: db user
|
|
:param password: db password
|
|
:param database: db database
|
|
:return: check async mini load process status.
|
|
"""
|
|
|
|
db_conn = MySQLdb.connect(host=host,port=port,user=user,passwd=password,db=database)
|
|
|
|
db_cursor = db_conn.cursor()
|
|
check_status_sql = "show load where label = '%s' order by CreateTime desc limit 1" % label
|
|
|
|
db_cursor.execute(check_status_sql)
|
|
rows = db_cursor.fetchall()
|
|
|
|
# timeout config: 60 minutes.
|
|
timeout = 60 * 60
|
|
|
|
while timeout > 0:
|
|
if len(rows) == 0:
|
|
print """Load label: %s doesn't exist""" % label
|
|
return
|
|
load_status = rows[0][2]
|
|
print "mini load status: " + load_status
|
|
if load_status == 'FINISHED':
|
|
print """Async mini load to db success! label is %s""" % label
|
|
break
|
|
if load_status == 'CANCELLED':
|
|
print """Async load to db failed! label is %s""" % label
|
|
break
|
|
timeout = timeout - 5
|
|
time.sleep(5)
|
|
db_cursor.execute(sql)
|
|
rows = db_cursor.fetchall()
|
|
|
|
if time_out <= 0:
|
|
print """Async load to db timeout! timeout second is: %s, label is %s""" % (time_out, label)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
"""
|
|
mini_load demo.
|
|
There is no need to install subprocess in Python 2.7. It is a standard module that is built in.
|
|
You need input db config & load param.
|
|
"""
|
|
db_host = "db_conn_host"
|
|
db_port = "port"
|
|
db_name = "db_name"
|
|
db_user = "db_user"
|
|
db_password = "db_password"
|
|
file_name = "file_name"
|
|
table = "db_table"
|
|
# default load_time_out, seconds
|
|
load_timeout = 86400
|
|
doris_client = DorisMiniLoadClient(
|
|
db_host, db_port, db_name, db_user, db_password, file_name, table, load_timeout)
|
|
doris_client.check_load_status(doris_client.load_doirs(), db_host, db_port, db_user, db_password, db_name)
|
|
print "load to doris end"
|