#!/bin/env python # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. ############################################################################ # # @file test_array_ddl.py # @date 2022-08-15 11:09:53 # @brief This file is a test file for array type. # # BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, FLOAT, DOUBLE, DECIMAL, DATE, DATETIME, CHAR, VARCHAR, STRING # 4种导入方式 insert/stream/broker/routine load # 4种导入格式 csv,json,parquet,orc # json导入参数 # max_filter_ratio # set column ############################################################################# """ test_array_load.py array routine load case in test_routine_load_property.py test_array_load """ import sys import os import time import pytest file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) sys.path.append(file_dir) file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.append(file_dir) from lib import palo_config from lib import palo_client from lib import palo_job from lib import util from lib import common from data import schema as SCHEMA from data import load_file as FILE from lib import palo_types config = palo_config.config broker_info = palo_client.BrokerInfo(config.broker_name, config.broker_property) # BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, FLOAT, DOUBLE, DECIMAL, DATE, DATETIME, CHAR, VARCHAR, STRING # 4种导入方式 insert/stream/broker/routine load # 4种导入格式 csv,json,parquet,orc # json导入参数 # max_filter_ratio # set column def setup_module(): """enable array""" client = common.get_client() ret = client.show_variables('enable_vectorized_engine') if len(ret) == 1 and ret[0][1] == 'false': raise pytest.skip('skip if enable_vectorized_engine is false') ret = client.admin_show_config('enable_array_type') assert len(ret) == 1, 'get enable_array_type config error' value = palo_job.AdminShowConfig(ret[0]).get_value() if value != 'true': client.set_frontend_config('enable_array_type', 'true') print(len(client.get_alive_backend_list())) def teardown_module(): """tearDown""" client = common.get_client() print(len(client.get_alive_backend_list())) def load_check(db, tb, schema, local_file, hdfs_file, expect_file, **kwargs): """ 1. create db 2. create tb & stream load, skip if no local file 3. brokerte tb & load, skip if no broker file 4. check load result with expect file 5. drop db **kwargs pass to stream load & broker load """ client = common.create_workspace(db) stream_tb = tb + '_stream' broker_tb = tb + '_broker' k2_type = util.get_attr_condition_value(schema, 0, 'k2', 1) be_checked_table = list() # stream load if local_file is not None: ret = client.create_table(stream_tb, schema, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' ret = client.stream_load(stream_tb, local_file, max_filter_ratio=0.01, **kwargs) assert ret, 'stream load failed' be_checked_table.append(stream_tb) time.sleep(5) # broker load if hdfs_file is not None: ret = client.create_table(broker_tb, schema, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' load_data_info = palo_client.LoadDataInfo(hdfs_file, broker_tb, **kwargs) ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, max_filter_ratio=0.01, is_wait=True) assert ret, 'broker load failed' be_checked_table.append(broker_tb) # check if local_file and hdfs_file: sql1 = 'select * from %s order by k1' % stream_tb sql2 = 'select * from %s order by k1' % broker_tb common.check2(client, sql1, sql2=sql2) for tb in be_checked_table: if 'DOUBLE' in k2_type.upper() or 'FLOAT' in k2_type.upper(): common.check_by_file(expect_file, table_name=tb, client=client, k2=palo_types.ARRAY_DOUBLE) elif 'DECIMAL' in k2_type.upper(): common.check_by_file(expect_file, table_name=tb, client=client, k2=palo_types.ARRAY_DECIMAL) else: common.check_by_file(expect_file, table_name=tb, client=client) client.clean(db) def test_array_boolean(): """ { "title": "test_array_boolean", "describe": "array类型中,子类型为boolean,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_boolean_list local_file = FILE.test_array_boolean_local_file hdfs_file = FILE.test_array_boolean_remote_file expect_file = FILE.expe_array_boolean_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_boolean_json(): """ { "title": "test_array_boolean_json", "describe": "array类型中,子类型为boolean,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_boolean_list local_file = FILE.test_array_boolean_local_json expect_file = FILE.expe_array_boolean_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_boolean_insert(): """ { "title": "test_array_boolean_insert", "describe": "array类型中,子类型为boolean,insert导入,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_boolean_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert array data sql_list = list() sql_list.append("insert into %s values (1000, ['1', '0'])" % table_name) sql_list.append("insert into %s values (1001, ['true', 'false'])" % table_name) sql_list.append("insert into %s values (1002, [2])" % table_name) sql_list.append("insert into %s values (1003, [NULL])" % table_name) sql_list.append("insert into %s values (1004, NULL)" % table_name) sql_list.append("insert into %s values (1005, ['Abc'])" % table_name) sql_list.append("insert into %s values (1006, [1.234])" % table_name) sql_list.append("insert into %s values (1007, '1,2,3')" % table_name) sql_list.append("insert into %s values (1008, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[1, 0]'), (1001, u'[1, 0]'), (1002, u'[1]'), (1003, u'[NULL]'), (1004, None), (1005, u'[NULL]'), (1006, u'[1]'), (1007, None), (1008, u'[1, NULL, NULL]')) util.check(ret, expect) client.clean(database_name) def test_array_tinyint(): """ { "title": "test_array_tinyint", "describe": "array类型中,子类型为tinyint,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_tinyint_list local_file = FILE.test_array_tinyint_local_file hdfs_file = FILE.test_array_tinyint_remote_file expect_file = FILE.expe_array_tinyint_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_tinyint_json(): """ { "title": "test_array_tinyint_json", "describe": "array类型中,子类型为tinyint,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_tinyint_list local_file = FILE.test_array_tinyint_local_json expect_file = FILE.expe_array_tinyint_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_tinyint_insert(): """ { "title": "test_array_tinyint_insert", "describe": "array类型中,子类型为tinyint,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_tinyint_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name) sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name) sql_list.append("insert into %s values (1004, NULL)" % table_name) err_sql_list.append("insert into %s values (1005, [-129])" % table_name) err_sql_list.append("insert into %s values (1006, [128])" % table_name) sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name) err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name) sql_list.append("insert into %s values (1009, '1,2,3')" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'), (1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'), (1009, None), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_smallint(): """ { "title": "test_array_smallint", "describe": "array类型中,子类型为smallint,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_smallint_list local_file = FILE.test_array_smallint_local_file hdfs_file = FILE.test_array_smallint_remote_file expect_file = FILE.expe_array_smallint_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_smallint_json(): """ { "title": "test_array_smallint_json", "describe": "array类型中,子类型为smallint,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_smallint_list local_file = FILE.test_array_smallint_local_json expect_file = FILE.expe_array_smallint_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_smallint_insert(): """ { "title": "test_array_smallint_insert", "describe": "array类型中,子类型为smallint,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_smallint_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name) sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name) sql_list.append("insert into %s values (1004, NULL)" % table_name) err_sql_list.append("insert into %s values (1005, [-32769])" % table_name) err_sql_list.append("insert into %s values (1006, [32768])" % table_name) sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name) err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name) sql_list.append("insert into %s values (1009, '1,2,3')" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'), (1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'), (1009, None), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_int(): """ { "title": "test_array_int", "describe": "array类型中,子类型为int,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_int_list local_file = FILE.test_array_int_local_file hdfs_file = FILE.test_array_int_remote_file expect_file = FILE.expe_array_int_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_int_json(): """ { "title": "test_array_int_json", "describe": "array类型中,子类型为intt,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_int_list local_file = FILE.test_array_int_local_json expect_file = FILE.expe_array_int_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_int_insert(): """ { "title": "test_array_int_insert", "describe": "array类型中,子类型为int,insert导入查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_int_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name) sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name) sql_list.append("insert into %s values (1004, NULL)" % table_name) err_sql_list.append("insert into %s values (1005, [-2147483649])" % table_name) err_sql_list.append("insert into %s values (1006, [2147483648])" % table_name) sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name) err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name) sql_list.append("insert into %s values (1009, '1,2,3')" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) err_sql_list.append("insert into %s values (1005, [-9223372036854775809])" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'), (1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'), (1009, None), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_bigint(): """ { "title": "test_array_bigint", "describe": "array类型中,子类型为bigint,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_bigint_list local_file = FILE.test_array_bigint_local_file hdfs_file = FILE.test_array_bigint_remote_file expect_file = FILE.expe_array_bigint_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_bigint_json(): """ { "title": "test_array_bigint_json", "describe": "array类型中,子类型为bigint,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_bigint_list local_file = FILE.test_array_bigint_local_json expect_file = FILE.expe_array_bigint_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_bigint_insert(): """ { "title": "test_array_bigint_insert", "describe": "array类型中,子类型为bigint,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_bigint_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, ['1002', '1002'])" % table_name) err_sql_list.append("insert into %s values (1003, [1, 'abc'])" % table_name) sql_list.append("insert into %s values (1004, [1, 3.1415, -0])" % table_name) sql_list.append("insert into %s values (1005, NULL)" % table_name) sql_list.append("insert into %s values (1006, '1,2,3')" % table_name) err_sql_list.append("insert into %s values (1007, [-9223372036854775809])" % table_name) err_sql_list.append("insert into %s values (1008, [9223372036854775808])" % table_name) sql_list.append("insert into %s values (1009, [-9223372036854775808, 9223372036854775807])" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1002, 1002]'), (1004, u'[1, 3, 0]'), (1005, None), (1006, None), (1009, u'[-9223372036854775808, 9223372036854775807]'), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_largeint(): """ { "title": "test_array_largeint_json", "describe": "array类型中,子类型为largeint,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_largeint_list local_file = FILE.test_array_largeint_local_file hdfs_file = FILE.test_array_largeint_remote_file expect_file = FILE.expe_array_largeint_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_largeint_json(): """ { "title": "test_array_largeint_json", "describe": "array类型中,子类型为boolean,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_largeint_list local_file = FILE.test_array_largeint_local_json expect_file = FILE.expe_array_largeint_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_largeint_json_num(): """ { "title": "test_array_largeint_json", "describe": "array类型中,子类型为largeint,stream导入json,largeint为数字非字符串,导入成功,结果为Null", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_largeint_list local_file = FILE.test_array_largeint_local_json_num expect_file = FILE.expe_array_largeint_json_num_file_null load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json') def test_array_largeint_json_num_as_string(): """ { "title": "test_array_largeint_json", "describe": "array类型中,子类型为largeint,stream导入json,设置num_as_string,largeint为数字非字符串,导入成功,结果为Null", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_largeint_list local_file = FILE.test_array_largeint_local_json_num expect_file = FILE.expe_array_largeint_json_num_file_real load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', num_as_string='true') def test_array_largeint_insert(): """ { "title": "test_array_largeint_insert", "describe": "array类型中,子类型为largeint,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_largeint_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name) sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name) sql_list.append("insert into %s values (1004, NULL)" % table_name) err_sql_list.append("insert into %s values (1005, [-170141183460469231731687303715884105729])" % table_name) err_sql_list.append("insert into %s values (1006, [170141183460469231731687303715884105728])" % table_name) sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name) err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name) sql_list.append("insert into %s values (1009, '1,2,3')" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'), (1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'), (1009, None), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_decimal(): """ { "title": "test_array_decimal", "describe": "array类型中,子类型为decimal,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_decimal_list local_file = FILE.test_array_decimal_local_file hdfs_file = FILE.test_array_decimal_remote_file expect_file = FILE.expe_array_decimal_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_decimal_json(): """ { "title": "test_array_decimal_json", "describe": "array类型中,子类型为decimal,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_decimal_list local_file = FILE.test_array_decimal_local_json expect_file = FILE.expe_array_decimal_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_decimal_insert(): """ { "title": "test_array_decimal_insert", "describe": "array类型中,子类型为decimal,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_decimal_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [123, 3, 4])" % table_name) sql_list.append("insert into %s values (1001, [])" % table_name) sql_list.append("insert into %s values (1002, [NULL])" % table_name) sql_list.append("insert into %s values (1003, ['-1.234', '2.3456'])" % table_name) sql_list.append("insert into %s values (1004, [1.0000000001])" % table_name) sql_list.append("insert into %s values (1005, [10000000000000000000.1111111222222222])" % table_name) sql_list.append("insert into %s values (1006, '1,3,4')" % table_name) err_sql_list.append("insert into %s values (1007, ['ABCD'])" % table_name) sql_list.append("insert into %s values (1008, NULL)" % table_name) sql_list.append("insert into %s values (1009, [999999999999999999.9999999])" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[123, 3, 4]'), (1001, u'[]'), (1002, u'[NULL]'), (1003, u'[-1.234, 2.3456]'), (1004, u'[1]'), (1005, u'[100000000000000000.001111111]'), # 1005 decimal is wrong, overflow (1006, None), (1008, None), (1009, u'[999999999999999999.9999999]'), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_float(): """ { "title": "test_array_float", "describe": "array类型中,子类型为float,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_float_list local_file = FILE.test_array_float_local_file hdfs_file = FILE.test_array_float_remote_file expect_file = FILE.expe_array_float_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_float_json(): """ { "title": "test_array_float_json", "describe": "array类型中,子类型为float,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_float_list local_file = FILE.test_array_float_local_json expect_file = FILE.expe_array_float_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_float_insert(): """ { "title": "test_array_float_insert", "describe": "array类型中,子类型为float,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_float_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [123, 3, 4])" % table_name) sql_list.append("insert into %s values (1001, [])" % table_name) sql_list.append("insert into %s values (1002, [NULL])" % table_name) sql_list.append("insert into %s values (1003, ['-1.234', '2.3456'])" % table_name) sql_list.append("insert into %s values (1004, [1.0000000001])" % table_name) sql_list.append("insert into %s values (1005, [10000000000000000000.1111111222222222])" % table_name) sql_list.append("insert into %s values (1006, '1,3,4')" % table_name) err_sql_list.append("insert into %s values (1007, ['ABCD'])" % table_name) sql_list.append("insert into %s values (1008, NULL)" % table_name) sql_list.append("insert into %s values (1009, [999999999999999999.9999999])" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[123, 3, 4]'), (1001, u'[]'), (1002, u'[NULL]'), (1003, u'[-1.234, 2.3456]'), (1004, u'[1]'), (1005, u'[1e+19]'), (1006, None), (1008, None), (1009, u'[1e+18]'), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_double(): """ { "title": "test_array_double", "describe": "array类型中,子类型为double,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_double_list local_file = FILE.test_array_double_local_file hdfs_file = FILE.test_array_double_remote_file expect_file = FILE.expe_array_double_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_double_json(): """ { "title": "test_array_double_json", "describe": "array类型中,子类型为double,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_double_list local_file = FILE.test_array_double_local_json expect_file = FILE.expe_array_double_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_double_insert(): """ { "title": "test_array_double_insert", "describe": "array类型中,子类型为double,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_double_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [123, 3, 4])" % table_name) sql_list.append("insert into %s values (1001, [])" % table_name) sql_list.append("insert into %s values (1002, [NULL])" % table_name) sql_list.append("insert into %s values (1003, ['-1.234', '2.3456'])" % table_name) sql_list.append("insert into %s values (1004, [1.0000000001])" % table_name) sql_list.append("insert into %s values (1005, [10000000000000000000.1111111222222222])" % table_name) sql_list.append("insert into %s values (1006, '1,3,4')" % table_name) err_sql_list.append("insert into %s values (1007, ['ABCD'])" % table_name) sql_list.append("insert into %s values (1008, NULL)" % table_name) sql_list.append("insert into %s values (1009, [999999999999999999.9999999])" % table_name) sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[123, 3, 4]'), (1001, u'[]'), (1002, u'[NULL]'), (1003, u'[-1.234, 2.3456]'), (1004, u'[1.0000000001]'), (1005, u'[1e+19]'), (1006, None), (1008, None), (1009, u'[1e+18]'), (1010, u'[1, 2, 3]')) util.check(ret, expect) client.clean(database_name) def test_array_date(): """ { "title": "test_array_date", "describe": "array类型中,子类型为date,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_date_list local_file = FILE.test_array_date_local_file hdfs_file = FILE.test_array_date_remote_file expect_file = FILE.expe_array_date_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_date_json(): """ { "title": "test_array_date_json", "describe": "array类型中,子类型为date,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_date_list local_file = FILE.test_array_date_local_json expect_file = FILE.expe_array_date_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_date_insert(): """ { "title": "test_array_date_insert", "describe": "array类型中,子类型为date,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_date_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, ['0001-01-01', '9999-12-31'])" % table_name) sql_list.append("insert into %s values (1001, ['10000-01-01'])" % table_name) sql_list.append("insert into %s values (1002, ['20200202'])" % table_name) sql_list.append("insert into %s values (1003, ['2020-01-01 12:23:12'])" % table_name) sql_list.append("insert into %s values (1004, ['2020/12/12'])" % table_name) err_sql_list.append("insert into %s values (1005, [2020-01-01])" % table_name) sql_list.append("insert into %s values (1006, [])" % table_name) sql_list.append("insert into %s values (1007, [NULL])" % table_name) sql_list.append("insert into %s values (1008, NULL)" % table_name) sql_list.append("insert into %s values (1009, ['ABC', 'DEF'])" % table_name) err_sql_list.append("insert into %s values (1010, 123)" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[0001-01-01, 9999-12-31]'), (1001, u'[NULL]'), (1002, u'[2020-02-02]'), (1003, u'[2020-01-01]'), (1004, u'[2020-12-12]'), (1006, u'[]'), (1007, u'[NULL]'), (1008, None), (1009, u'[NULL, NULL]')) util.check(ret, expect) client.clean(database_name) def test_array_datetime(): """ { "title": "test_array_datetime", "describe": "array类型中,子类型为datetime,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_datetime_list local_file = FILE.test_array_datetime_local_file hdfs_file = FILE.test_array_datetime_remote_file expect_file = FILE.expe_array_datetime_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_datetime_json(): """ { "title": "test_array_datetime_json", "describe": "array类型中,子类型为datetime,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_datetime_list local_file = FILE.test_array_datetime_local_json expect_file = FILE.expe_array_datetime_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_datetime_insert(): """ { "title": "test_array_datetime_insert", "describe": "array类型中,子类型为datetime,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_datetime_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, ['0001-01-01 00:00:00', '9999-12-31 23:59:58'])" % table_name) sql_list.append("insert into %s values (1003, ['2020-01-01'])" % table_name) sql_list.append("insert into %s values (1004, ['2020-13-01'])" % table_name) sql_list.append("insert into %s values (1005, ['0000-00-00 00:00:00'])" % table_name) sql_list.append("insert into %s values (1006, ['2020/01/01 00-00-00'])" % table_name) sql_list.append("insert into %s values (1007, ['20200101120000', 20200101120000])" % table_name) sql_list.append("insert into %s values (1008, NULL)" % table_name) sql_list.append("insert into %s values (1009, ['ABCD', 'AM'])" % table_name) sql_list.append("insert into %s values (1010, '1,2,3')" % table_name) sql_list.append("insert into %s values (1011, ['1975-06-18 01:47:17'])" % table_name) sql_list.append("insert into %s values (1012, '[0001-01-01 00:00:00, 9999-12-31 23:59:58]')" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[0001-01-01 00:00:00, 9999-12-31 23:59:58]'), (1003, u'[2020-01-01 00:00:00]'), (1004, u'[NULL]'), (1005, u'[NULL]'), (1006, u'[2020-01-01 00:00:00]'), (1007, u'[2020-01-01 12:00:00, 2020-01-01 12:00:00]'), (1008, None), (1009, u'[NULL, NULL]'), (1010, None), (1011, u'[1975-06-18 01:47:17]'), (1012, u'[0001-01-01 00:00:00, 9999-12-31 23:59:58]')) util.check(ret, expect) client.clean(database_name) def test_array_char(): """ { "title": "test_array_char", "describe": "array类型中,子类型为char,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_char_list local_file = FILE.test_array_char_local_file hdfs_file = FILE.test_array_char_remote_file expect_file = FILE.expe_array_char_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) @pytest.mark.skip() def test_array_char_json(): """ { "title": "test_array_char_json", "describe": "array类型中,子类型为char,stream导入json,导入成功,查询结果正确.", "tag": "function,p0" } """ # todo: 存在json转义字符问题 database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_char_list local_file = FILE.test_array_char_local_json expect_file = FILE.expe_array_char_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) @pytest.mark.skip() def test_array_char_insert(): """ { "title": "test_array_char_insert", "describe": "array类型中,子类型为char,insert导入,查询结果正确", "tag": "function,p0" } """ # todo: strict为true未过滤,不生效 database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_char_list client = common.create_workspace(database_name) client.set_variables('enable_insert_strict', 'true') ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [''])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, NULL)" % table_name) sql_list.append("insert into %s values (1003, ['true', 'false', null])" % table_name) err_sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name) sql_list.append("insert into %s values (1005, ['中文验证', '测试', '1kdads', '空 格'])" % table_name) sql_list.append("insert into %s values (1006, ['!@#$^&*()', '#$<>:\\'{},'])" % table_name) sql_list.append("insert into %s values (1009, ['hello', 'word'])" % table_name) sql_list.append("insert into %s values (1010, '[]')" % table_name) err_sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name) err_sql_list.append("insert into %s values (1007, ['too long:%s', 'ttds'])" % (table_name, 'changdu' * 50)) sql_list.append("insert into %s values (1008, 'ddate')" % table_name) # ?? for i in sql_list: client.execute(i) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u"['']"), (1001, u'[NULL]'), (1002, None), (1003, u"['true', 'false', NULL]"), (1005, u"['\u4e2d\u6587\u9a8c\u8bc1', '\u6d4b\u8bd5', '1kdads', '\u7a7a \u683c']"), (1006, u"['!@#$^&*()', '#$<>:'{},']"), (1008, None), (1009, u"['hello', 'word']"), (1010, u'[]')) util.check(ret, expect, True) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) # strict关闭,之前不支持的sql导入成功 client.set_variables('enable_insert_strict', 'false') err_sql_list = list() err_sql_list.append("insert into %s values (1007, ['too long:%s', 'ttds'])" % (table_name, 'changdu' * 50)) err_sql_list.append("insert into %s values (1008, 'ddate')" % table_name) for sql in err_sql_list: util.assert_return(True, ' ', client.execute, sql) sql = 'select * from %s where k1 in (1007, 1008)' % table_name ret = client.execute(sql) expect = ((1008, None),) util.check(ret, expect) client.clean(database_name) def test_array_varchar(): """ { "title": "test_array_varchar", "describe": "array类型中,子类型为varchar,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_varchar_list local_file = FILE.test_array_varchar_local_file hdfs_file = FILE.test_array_varchar_remote_file expect_file = FILE.expe_array_varchar_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_varchar_json(): """ { "title": "test_array_varchar_json", "describe": "array类型中,子类型为varchar,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_varchar_list local_file = FILE.test_array_varchar_local_json expect_file = FILE.expe_array_varchar_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_varchar_insert(): """ { "title": "test_array_varchar_insert", "describe": "array类型中,子类型为varchar,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_varchar_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [''])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, NULL)" % table_name) sql_list.append("insert into %s values (1003, ['true', 'false', null])" % table_name) sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name) sql_list.append("insert into %s values (1005, ['中文验证', '测试', '1kdads', '空 格'])" % table_name) sql_list.append("insert into %s values (1006, ['!@#$^&*()', '#$<>:\\'{},'])" % table_name) err_sql_list.append("insert into %s values (1007, ['too long:%s']" % (table_name, 'changdu' * 50)) sql_list.append("insert into %s values (1008, 'ddate')" % table_name) sql_list.append("insert into %s values (1009, ['hello', 'word'])" % table_name) sql_list.append("insert into %s values (1010, '[]')" % table_name) err_sql_list.append("insert into %s values (1011, {'test'})" % table_name) err_sql_list.append("insert into %s values (1012, [{},{}])" % table_name) err_sql_list.append("insert into %s values (1013, [[],[]])" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u"['']"), (1001, u'[NULL]'), (1002, None), (1003, u"['true', 'false', NULL]"), (1004, u"['1', '2', '3']"), (1005, u"['\u4e2d\u6587\u9a8c\u8bc1', '\u6d4b\u8bd5', '1kdads', '\u7a7a \u683c']"), (1006, u"['!@#$^&*()', '#$<>:'{},']"), (1008, None), (1009, u"['hello', 'word']"), (1010, u'[]')) util.check(ret, expect) client.clean(database_name) def test_array_string(): """ { "title": "test_array_string", "describe": "array类型中,子类型为string,stream、broker导入csv,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_string_list local_file = FILE.test_array_varchar_local_file hdfs_file = FILE.test_array_varchar_remote_file expect_file = FILE.expe_array_string_file load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file) def test_array_string_json(): """ { "title": "test_array_string_json", "describe": "array类型中,子类型为string,stream导入json,导入成功,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_string_list local_file = FILE.test_array_varchar_local_json expect_file = FILE.expe_array_string_file load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json', strip_outer_array=True) def test_array_string_insert(): """ { "title": "test_array_string_insert", "describe": "array类型中,子类型为string,insert导入,查询结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() column_list = SCHEMA.array_string_list client = common.create_workspace(database_name) ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' # insert sql_list = list() err_sql_list = list() sql_list.append("insert into %s values (1000, [''])" % table_name) sql_list.append("insert into %s values (1001, [NULL])" % table_name) sql_list.append("insert into %s values (1002, NULL)" % table_name) sql_list.append("insert into %s values (1003, ['true', 'false', null])" % table_name) sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name) sql_list.append("insert into %s values (1005, ['中文验证', '测试', '1kdads', '空 格'])" % table_name) sql_list.append("insert into %s values (1006, ['!@#$^&*()', '#$<>:\\'{},'])" % table_name) sql_list.append("insert into %s values (1007, ['long:%s'])" % (table_name, 'changdu' * 50)) sql_list.append("insert into %s values (1008, 'ddate')" % table_name) sql_list.append("insert into %s values (1009, ['hello', 'word'])" % table_name) sql_list.append("insert into %s values (1010, '[]')" % table_name) err_sql_list.append("insert into %s values (1011, {'test'})" % table_name) err_sql_list.append("insert into %s values (1012, [{},{}])" % table_name) err_sql_list.append("insert into %s values (1013, [[],[]])" % table_name) for i in sql_list: client.execute(i) for sql in err_sql_list: util.assert_return(False, ' ', client.execute, sql) sql = 'select * from %s order by k1' % table_name ret = client.execute(sql) expect = ((1000, u"['']"), (1001, u'[NULL]'), (1002, None), (1003, u"['true', 'false', NULL]"), (1004, u"['1', '2', '3']"), (1005, u"['\u4e2d\u6587\u9a8c\u8bc1', '\u6d4b\u8bd5', '1kdads', '\u7a7a \u683c']"), (1006, u"['!@#$^&*()', '#$<>:'{},']"), (1007, u"['long:%s']" % ('changdu' * 50)), (1008, None), (1009, u"['hello', 'word']"), (1010, u'[]')) util.check(ret, expect) client.clean(database_name) def test_array_parquet(): """ { "title": "test_array_parquet", "describe": "parquet文件,列类型一个为string,一个是hive生成的array复杂类型列,分别导入,不支持hive生成的格式,但be不core", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() client = common.create_workspace(database_name) ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_parquet_string, table_name, format_as='parquet') ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True) assert ret, 'broker load failed' assert client.verify(FILE.expe_array_table_file, table_name) column_name_list = util.get_attr(SCHEMA.array_table_list, 0) load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_parquet_hive, table_name, format_as='parquet', column_name_list=column_name_list) ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True) assert not ret, 'expect broker load failed' client.clean(database_name) def test_array_orc(): """ { "title": "test_array_orc", "describe": "orc文件,列类型一个为string,一个是hive生成的array类型列,分别导入,支持hive生成的格式,todo check", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() client = common.create_workspace(database_name) ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_orc_string, table_name, format_as='orc') ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True) assert ret, 'broker load faled' assert client.verify(FILE.expe_array_table_file, table_name) load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_parquet_hive, table_name, format_as='orc') ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True) assert not ret, 'expect broker load failed' assert client.truncate(table_name), 'truncate table failed' load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_orc_hive, table_name, format_as='orc') ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True) assert ret, 'expect broker load failed' # assert client.verify(FILE.expe_array_table_file, table_name) # client.clean(database_name) def test_array_json(): """ { "title": "test_array_orc", "describe": "json文件导入,成功,结果正确", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() client = common.create_workspace(database_name) ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' ret = client.stream_load(table_name, FILE.test_array_table_local_json, format='json', strip_outer_array=True, num_as_string='true') assert ret, 'stream load failed' common.check_by_file(FILE.expe_array_table_file, table_name, client=client, a7=palo_types.ARRAY_DECIMAL, a8=palo_types.ARRAY_FLOAT, a9=palo_types.ARRAY_DOUBLE) client.clean(database_name) def test_array_strict(): """ { "title": "test_array_strict", "describe": "验证strict模式,分别为not Null列,strict为true,strict为false,构造错误数据", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() client = common.create_workspace(database_name) schema = SCHEMA.array_int_list tb_n = table_name + '_n' # not null tb_s = table_name + '_s' # strict true tb_ns = table_name + '_ns' # strict false ret = client.create_table(tb_s, schema, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' ret = client.create_table(tb_ns, schema, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' ret = client.create_table(tb_n, schema, keys_desc=SCHEMA.duplicate_key, set_null=False) assert ret, 'create table failed' ret = client.stream_load(tb_n, FILE.test_array_mix_file_local, max_filter_ratio=1, column_separator='|') assert ret, 'stream load failed' ret = client.stream_load(tb_s, FILE.test_array_mix_file_local, strict_mode='true', max_filter_ratio=1, column_separator='|') assert ret, 'stream load failed' ret = client.stream_load(tb_ns, FILE.test_array_mix_file_local, strict_mode='false', max_filter_ratio=1, column_separator='|') assert ret, 'stream load failed' tb_i = table_name + '_i' # insert test ret = client.create_table(tb_i, schema, keys_desc=SCHEMA.duplicate_key, set_null=True) assert ret, 'create table failed' insert_sql = 'insert into %s values(1, "abc")' % tb_i # todo # client.set_variables('enable_insert_strict', 'true') # util.assert_return(False, ' ', client.execute, insert_sql) client.set_variables('enable_insert_strict', 'false') util.assert_return(True, None, client.execute, insert_sql) # client.clean(database_name) def test_array_stream_param(): """ { "title": "test_array_stream_param", "describe": "导入时,设置的一些参数,如set,where,columns,partition", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() client = common.create_workspace(database_name) check_tb = 'check_tb' ret = client.create_table(check_tb, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) assert ret, 'create table failed' ret = client.stream_load(check_tb, FILE.test_array_table_local_file, column_separator='|') assert ret, 'stream load failed' ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, partition_info=SCHEMA.baseall_tinyint_partition_info) assert ret, 'create table failed' # column, where, partition columns = ['k1', 'b1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a10', 'a11', 'a12', 'a13', 'a14', 'a1=[]'] ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', column_name_list=columns, where='size(a2) > 270', partitions='p1, p2') assert not ret, 'expect stream load failed' columns = ['k1', 'b1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a10', 'a11', 'a12', 'a13', 'a14', 'a1=[]'] ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', column_name_list=columns, where='size(a2) > 270', partitions='p1, p2', max_filter_ratio=0.8) assert ret, 'stream load failed' sql1 = 'select * from %s order by k1' % table_name sql2 = 'select k1, [], a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14 from %s ' \ 'where size(a2) > 270 and k1 < 0 order by k1' % check_tb common.check2(client, sql1=sql1, sql2=sql2) # array function client.truncate(table_name) columns = ['k1', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a10', 'a11', 'a12', 'a13', 'b14', 'a14=array_sort(array_union(cast(a12 as array), cast(a13 as array)))'] ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', column_name_list=columns, where='size(a2) > 270', partitions='p1, p2', max_filter_ratio=0.8) assert ret, 'stream load failed' sql1 = 'select * from %s order by k1' % table_name sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, ' \ 'array_sort(array_union(a12, a13)) from %s where size(a2) > 270 and k1 < 0 order by k1' % check_tb common.check2(client, sql1=sql1, sql2=sql2) # array function, todo core client.truncate(table_name) columns = ['k1', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a10', 'a11', 'a12', 'a13', 'b14', 'a14=array_remove(cast(a5 as array), 1)'] ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', column_name_list=columns, where='size(a2) > 270', partitions='p1, p2', max_filter_ratio=0.8) assert ret, 'stream load failed' sql1 = 'select * from %s order by k1' % table_name sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, array_sort(array_union(a12, a13)) ' \ 'from %s where size(a2) > 270 and k1 < 0 order by k1' % check_tb common.check2(client, sql1=sql1, sql2=sql2) # client.clean(database_name) def test_array_broker_param(): """ { "title": "test_array_broker_param", "describe": "导入时,设置的一些参数,如set,where,columns,partition", "tag": "function,p0" } """ database_name, table_name, index_name = util.gen_name_list() client = common.create_workspace(database_name) check_tb = 'check_tb' ret = client.create_table(check_tb, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) assert ret, 'create table failed' ret = client.stream_load(check_tb, FILE.test_array_table_local_file, column_separator='|') assert ret, 'stream load failed' ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, partition_info=SCHEMA.baseall_tinyint_partition_info) assert ret, 'create table failed' # broker load & check column_name_list = ['k1', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a10', 'a11', 'a12', 'a13', 'b14'] set_list = ['a14=array_union(cast(b14 as array), cast(a13 as array))'] load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name, partition_list=['p1', 'p2'], column_name_list=column_name_list, set_list=set_list, column_terminator='|', where_clause='size(a2) > 270') ret = client.batch_load(util.get_label(), load_data_info, max_filter_ratio=0.8, broker=broker_info, is_wait=True) assert ret sql1 = 'select * from %s order by k1' % table_name sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, array_union(a14, a13) from %s ' \ 'where size(a2) > 270 and k1 < 0 order by k1' % check_tb common.check2(client, sql1=sql1, sql2=sql2) client.truncate(table_name) set_list = ['a14=[]'] load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name, partition_list=['p1', 'p2'], column_name_list=column_name_list, set_list=set_list, column_terminator='|', where_clause='size(a2) > 270') ret = client.batch_load(util.get_label(), load_data_info, max_filter_ratio=0.8, broker=broker_info, is_wait=True) assert ret sql1 = 'select * from %s order by k1' % table_name sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, [] from %s ' \ 'where size(a2) > 270 and k1 < 0 order by k1' % check_tb common.check2(client, sql1=sql1, sql2=sql2) client.truncate(table_name) set_list = ['a14=[]'] load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name, partition_list=['p1', 'p2'], column_name_list=column_name_list, set_list=set_list, column_terminator='|', where_clause='size(array_distinct(a2)) = 1') ret = client.batch_load(util.get_label(), load_data_info, max_filter_ratio=0.8, broker=broker_info, is_wait=True) assert ret sql1 = 'select * from %s order by k1' % table_name sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, [] from %s ' \ 'where size(array_distinct(a2)) = 1 and k1 < 0 order by k1' % check_tb common.check2(client, sql1=sql1, sql2=sql2) client.clean(database_name) if __name__ == '__main__': setup_module() # test_array_decimal() # todo: 10000000000000000000.1111111222222222 vs 100000000000000000.001111111 # test_array_char() # todo 逗号数据不正确问题,json转义字符问题 # test_array_stream_param() # todo stream load core test_array_strict() # todo