439 lines
19 KiB
Python
439 lines
19 KiB
Python
#!/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
############################################################################
|
|
#
|
|
# @file test_sys_load_parse_from_path.py
|
|
# @date 2019-09-10 16:02:05
|
|
# @brief This file is a test file for broker load parse columns from file path
|
|
#
|
|
#############################################################################
|
|
|
|
"""
|
|
测试broker load,从文件路径中解析数据
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
sys.path.append(file_dir)
|
|
file_dir = os.path.abspath(os.path.dirname(__file__))
|
|
from data import schema as DATA
|
|
from data import load_file as FILE
|
|
from lib import palo_config
|
|
from lib import palo_client
|
|
from lib import util
|
|
|
|
client = None
|
|
LOG = palo_client.LOG
|
|
L = palo_client.L
|
|
|
|
config = palo_config.config
|
|
broker_info = palo_config.broker_info
|
|
|
|
|
|
def setup_module():
|
|
"""
|
|
setUp
|
|
"""
|
|
global client
|
|
client = palo_client.PaloClient(config.fe_host, config.fe_query_port, user=config.fe_user,
|
|
password=config.fe_password)
|
|
client.init()
|
|
|
|
|
|
def init_check_table(db_name, check_table='check_tb'):
|
|
"""init check table for data verify"""
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(check_table, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
|
|
database_name=db_name, set_null=True)
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, check_table,
|
|
column_name_list=column_name_list)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info,
|
|
database_name=db_name)
|
|
assert ret
|
|
|
|
|
|
def check2(sql1, sql2):
|
|
"""check select result"""
|
|
ret1 = client.execute(sql1)
|
|
ret2 = client.execute(sql2)
|
|
LOG.info(L('sql', sql=sql1))
|
|
LOG.info(L('check sql', check_sql=sql2))
|
|
util.check(ret1, ret2)
|
|
|
|
|
|
def test_parse_key_from_path():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_key_from_path",
|
|
"describe": "parse partition key from file path, 从路径中解析key列,分区列",
|
|
"tag": "system,p1"
|
|
}
|
|
"""
|
|
"""
|
|
parse partition key from file path
|
|
从路径中解析key列,分区列
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
|
|
assert ret
|
|
column_name_list = ['tmp_k1', 'tmp_k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['k1', 'k2', 'city']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, table_name,
|
|
column_name_list=column_name_list, columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert ret
|
|
check_table = 'check_tb'
|
|
init_check_table(database_name)
|
|
sql1 = 'select * from %s order by 1, 2, 3' % table_name
|
|
sql2 = 'select -1, 0, k3, k4, k5, v1, v2, v3, v4, v5, v6 from %s order by k3' % check_table
|
|
check2(sql1, sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_from_path_set():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_from_path_set",
|
|
"describe": "parse value column from file path and set, 从路径中解析value列, 使用路径中解析的列进行set函数",
|
|
"tag": "system,p1"
|
|
}
|
|
"""
|
|
"""
|
|
parse value column from file path and set
|
|
从路径中解析value列
|
|
使用路径中解析的列进行set函数
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
|
|
assert ret
|
|
column_name_list = ['tmp_k1', 'tmp_k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'tmp_v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['k1', 'k2', 'city']
|
|
set_list = ['v3=city']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, table_name, set_list=set_list,
|
|
column_name_list=column_name_list, columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert ret
|
|
check_table = 'check_tb'
|
|
init_check_table(database_name)
|
|
sql1 = 'select * from %s order by 1, 2, 3' % table_name
|
|
sql2 = 'select -1, 0, k3, k4, k5, v1, v2, "bj", v4, v5, v6 from %s order by k3' % check_table
|
|
check2(sql1, sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_special_from_path():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_special_from_path",
|
|
"describe": "parse special char empty from path to char column, 从路径中解析空",
|
|
"tag": "system,p1,fuzz"
|
|
}
|
|
"""
|
|
"""
|
|
parse special char empty from path to char column
|
|
从路径中解析空
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'tmp_v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['city']
|
|
set_list = ['v3=city']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name, set_list=set_list,
|
|
column_name_list=column_name_list, columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert ret
|
|
check_table = 'check_tb'
|
|
init_check_table(database_name)
|
|
sql1 = 'select * from %s order by 1, 2, 3' % table_name
|
|
sql2 = 'select k1, k2, k3, k4, k5, v1, v2, "", v4, v5, v6 from %s order by k3' % check_table
|
|
check2(sql1, sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_special_from_path_1():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_special_from_path_1",
|
|
"describe": "parse special char empty from path to int column, 从路径中解析空到数字列,报错",
|
|
"tag": "system,p1,fuzz"
|
|
}
|
|
"""
|
|
"""
|
|
parse special char empty from path to int column
|
|
从路径中解析空到数字列,报错
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
|
|
set_null=True)
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['city']
|
|
set_list = ['k1=city']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name, set_list=set_list,
|
|
column_name_list=column_name_list, columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert ret
|
|
check_table = 'check_tb'
|
|
init_check_table(database_name)
|
|
sql1 = 'select * from %s order by 1, 2, 3' % table_name
|
|
sql2 = 'select NULL, k2, k3, k4, k5, v1, v2, v3, v4, v5, v6 from %s order by k3' % check_table
|
|
check2(sql1, sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_column_wrong():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_column_wrong",
|
|
"describe": "parse duplicate column from load file, 重复列报错",
|
|
"tag": "system,p1,fuzz"
|
|
}
|
|
"""
|
|
"""
|
|
parse duplicate column from load file
|
|
重复列报错
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['k1', 'k2', 'city']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, table_name,
|
|
column_name_list=column_name_list, columns_from_path=column_from_path)
|
|
|
|
try:
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert 0 == 1
|
|
except Exception as e:
|
|
print(str(e))
|
|
assert 'Duplicate column' in str(e)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_column_wrong_1():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_column_wrong_1",
|
|
"describe": "parse a not exist column from file path, 从路径中解析个不存在的列,失败",
|
|
"tag": "system,p1,fuzz"
|
|
}
|
|
"""
|
|
"""
|
|
parse a not exist column from file path
|
|
从路径中解析个不存在的列,失败
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
|
|
set_null=True)
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'tmp_k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['k5']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name,
|
|
column_name_list=column_name_list,
|
|
columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert not ret
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_column_filtered():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_column_filtered",
|
|
"describe": "parse a column to a not exist partition, 解析的列不存在相应的分区,被过滤",
|
|
"tag": "system,p1,fuzz"
|
|
}
|
|
"""
|
|
"""
|
|
parse a column to a not exist partition
|
|
解析的列不存在相应的分区,被过滤
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['-50', '-10', '-5', '-2']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
|
|
set_null=True)
|
|
assert ret
|
|
column_name_list = ['tmp_k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['k1', 'city']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name,
|
|
column_name_list=column_name_list, columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert not ret
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_wrong_type():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_wrong_type",
|
|
"describe": " test parse wrong type from file path, 解析出来的数据类型不符合,由strict mode参数控制是否导入成功,默认strict mode为FALSE",
|
|
"tag": "system,p1,fuzz"
|
|
}
|
|
"""
|
|
"""
|
|
test parse wrong type from file path
|
|
解析出来的数据类型不符合,由strict mode参数控制是否导入成功,默认strict mode为FALSE
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
|
|
set_null=True)
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'tmp_k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
|
|
column_from_path = ['k5']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_float, table_name,
|
|
column_name_list=column_name_list,
|
|
columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info, strict_mode=True)
|
|
assert not ret
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert ret
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_parse_float():
|
|
"""
|
|
{
|
|
"title": "test_sys_load_parse_from_path.test_parse_float",
|
|
"describe": "parse a float from file path, 解析小数类型",
|
|
"tag": "system,p1"
|
|
}
|
|
"""
|
|
"""
|
|
parse a float from file path
|
|
解析小数类型
|
|
"""
|
|
database_name, table_name, rollup_table_name = util.gen_name_list()
|
|
client.clean(database_name)
|
|
client.create_database(database_name)
|
|
client.use(database_name)
|
|
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
|
|
partition_value_list = ['5', '20', '30', 'MAXVALUE']
|
|
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
|
|
partition_value_list)
|
|
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
|
|
ret = client.create_table(table_name, DATA.partition_column_list_parse, partition_info,
|
|
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5_t)',
|
|
set_null=True)
|
|
assert ret
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'tmp_k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'tmp_v6']
|
|
column_from_path = ['k5']
|
|
set_list = ['k5_t=tmp_k5, v6=k5']
|
|
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_float, table_name,
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
columns_from_path=column_from_path)
|
|
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
|
|
assert ret
|
|
check_table = 'check_tb'
|
|
init_check_table(database_name)
|
|
sql1 = 'select * from %s order by 1, 2, 3' % table_name
|
|
sql2 = 'select k1, k2, k3, k4, k5, v1, v2, v3, v4, v5, 100.12345 from %s order by k3' % check_table
|
|
check2(sql1, sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def teardown_module():
|
|
"""teardown"""
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
setup_module()
|
|
test_parse_float()
|