1883 lines
100 KiB
Python
1883 lines
100 KiB
Python
#!/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
/***************************************************************************
|
|
*
|
|
* @file test_sys_partition_schema_change.py
|
|
* @date 2015/02/04 15:26:21
|
|
* @brief This file is a test file for Palo schema changing.
|
|
*
|
|
**************************************************************************/
|
|
对于unique表的导入来说,每条数据都是有全部的key的,相当于是按照全key进行数据delete的
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
sys.path.append("../")
|
|
sys.path.append("../../")
|
|
from data import schema as DATA
|
|
from data import load_file as FILE
|
|
from lib import palo_config
|
|
from lib import palo_client
|
|
from lib import util
|
|
from lib import common
|
|
from lib import palo_job
|
|
from lib import kafka_config
|
|
|
|
config = palo_config.config
|
|
LOG = palo_client.LOG
|
|
L = palo_client.L
|
|
broker_info = palo_config.broker_info
|
|
TOPIC = 'routine-load-delete-%s' % config.fe_query_port
|
|
|
|
|
|
def setup_module():
|
|
"""set up"""
|
|
global check_db, baseall_tb
|
|
baseall_tb = 'baseall'
|
|
if 'FE_DB' in os.environ.keys():
|
|
check_db = os.environ['FE_DB']
|
|
else:
|
|
check_db = 'test_query_qa'
|
|
|
|
|
|
def teardown_module():
|
|
"""tear down"""
|
|
pass
|
|
|
|
|
|
def test_delete_broker_basic():
|
|
"""
|
|
{
|
|
"title": "test_delete_broker_basic",
|
|
"describe": "验证broker load的delete的基本功能",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# delete load, 一个空表,表中的数据仍然为空
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
sql = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb))
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_broker_column_set():
|
|
"""
|
|
{
|
|
"title": "test_delete_broker_column_set",
|
|
"describe": "验证broker load的delete的列设置",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12']
|
|
set_list = ['k0=k7', 'k5=k4']
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE',
|
|
column_name_list=column_name_list, set_list=set_list)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
|
|
column_name_list=column_name_list, set_list=set_list)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE',
|
|
column_name_list=column_name_list, set_list=set_list)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_broker_filter_ratio():
|
|
"""
|
|
{
|
|
"title": "test_delete_broker_column_set",
|
|
"describe": "验证broker load的delete的数据过滤",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12']
|
|
set_list = ['k0=k7', 'k5=k4']
|
|
where = 'k1 > 8'
|
|
partitions = ['p3']
|
|
load_data_desc1 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
where_clause=where, partition_list=partitions)
|
|
ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load
|
|
load_data_desc2 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
|
|
column_name_list=column_name_list, set_list=set_list)
|
|
ret = client.batch_load(util.get_label(), load_data_desc2, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1)
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1 != 9 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_broker_basic():
|
|
"""
|
|
{
|
|
"title": "test_merge_broker_basic",
|
|
"describe": "验证broker load的merge的基本功能",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 > 0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再merge,delete on条件未命中数据,数据全部导入
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 = 0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
sql = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb))
|
|
# 再导入,delete on条件命中部分数据,命中数据被删除,其他数据保持不变
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k2 > 0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s where k2 <= 0 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_broker_set_columns():
|
|
"""
|
|
{
|
|
"title": "test_merge_broker_set_columns",
|
|
"describe": "验证broker load的merge列设置",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12']
|
|
set_list = ['k0=k7', 'k5=k4']
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
delete_on_predicates='k1 > 0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
delete_on_predicates='k1=0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
delete_on_predicates='k7="false"')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k6 != "false" order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_broker_filter_ratio():
|
|
"""
|
|
{
|
|
"title": "test_merge_broker_filter_ratio",
|
|
"describe": "验证broker load的merge数据导入",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12']
|
|
set_list = ['k0=k7', 'k5=k4']
|
|
where = 'k1 > 8'
|
|
partitions = ['p3']
|
|
load_data_desc1 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
where_clause=where, partition_list=partitions,
|
|
delete_on_predicates='k1 > 0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load
|
|
load_data_desc2 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
delete_on_predicates='k1=0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc2, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
load_data_desc1 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list,
|
|
where_clause='k5 is not null', partition_list=['p1', 'p2', 'p3', 'p4'],
|
|
delete_on_predicates='k8 > "2000-01-01"')
|
|
ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1)
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k10 <= "20000101" order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_stream_basic():
|
|
"""
|
|
{
|
|
"title": "test_delete_stream_basic",
|
|
"describe": "验证stream load的delete的基本功能",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# delete load, 一个空表,表中的数据仍然为空
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='APPEND')
|
|
assert ret, 'stream failed'
|
|
sql = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb))
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_stream_column_set():
|
|
"""
|
|
{
|
|
"title": "test_delete_stream_column_set",
|
|
"describe": "验证stream load的delete的列设置",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='DELETE')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='APPEND')
|
|
assert ret, 'stream failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='DELETE')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_stream_filter_ratio():
|
|
"""
|
|
{
|
|
"title": "test_delete_stream_filter_ratio",
|
|
"describe": "验证stream load的delete的数据过滤",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
where = 'k1 > 8'
|
|
partitions = ['p3']
|
|
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
where_filter=where, partition_list=partitions, merge_type='DELETE', max_filter_ratio=1)
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='APPEND')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
where_filter=where, partition_list=partitions, merge_type='DELETE', max_filter_ratio=1)
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1 != 9 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_stream_basic():
|
|
"""
|
|
{
|
|
"title": "test_merge_stream_basic",
|
|
"describe": "验证stream load的merge的基本功能",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k1>0')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再merge,delete on条件未命中数据,数据全部导入
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k1=0')
|
|
assert ret, 'stream load failed'
|
|
sql = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb))
|
|
# 再导入,delete on条件命中部分数据,命中数据被删除,其他数据保持不变
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k2>0')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s where k2 <= 0 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_stream_set_columns():
|
|
"""
|
|
{
|
|
"title": "test_merge_stream_set_columns",
|
|
"describe": "验证broker load的merge导入",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='MERGE', delete='k1>0')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='MERGE', delete='k1=0')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='MERGE', delete='k7="false"')
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k6 != "false" order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_stream_filter_ratio():
|
|
"""
|
|
{
|
|
"title": "test_merge_stream_filter_ratio",
|
|
"describe": "验证stream load的merge数据过滤",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
where = 'k1 > 8'
|
|
partitions = ['p3']
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
where_filter=where, partition_list=partitions, max_filter_ratio=1,
|
|
merge_type='MERGE', delete='k1>0')
|
|
assert ret, 'stream load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='MERGE', delete='k1=0')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
where_filter='k5 is not null', partition_list=['p1', 'p2', 'p3', 'p4'],
|
|
merge_type='MERGE', delete='k8 > "2000-01-01"')
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k10 <= "20000101" order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_routine_basic():
|
|
"""
|
|
{
|
|
"title": "test_delete_routine_basic",
|
|
"describe": "验证routine load的delete的基本功能",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
# enable batch delete & check
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 1.delete load, 一个空表,表中的数据仍然为空
|
|
# create routine load
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('DELETE')
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
routine_load_job = palo_job.RoutineLoadJob(client.show_routine_load(routine_load_job_name)[0])
|
|
ret = (routine_load_job.get_merge_type() == 'DELETE')
|
|
common.assert_stop_routine_load(ret, client, routine_load_job_name, 'expect delete merge type')
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
# send kafka data & check, expect empty table
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
ret = client.select_all(table_name)
|
|
common.assert_stop_routine_load(ret == (), client, routine_load_job_name, 'check error')
|
|
# 2.向表中导入数据,再delete load,预期表为空
|
|
# stream load and check
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='APPEND')
|
|
common.assert_stop_routine_load(ret, client, routine_load_job_name, 'stream load failed')
|
|
sql = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb))
|
|
# send kafka data & check
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 30)
|
|
ret = client.select_all(table_name)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_routine_column_set():
|
|
"""
|
|
{
|
|
"title": "test_delete_routine_column_set",
|
|
"describe": "验证routine load的delete,设置column",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('DELETE')
|
|
routine_load_property.set_column_mapping((column_name_list))
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load,预期表为空
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='APPEND')
|
|
common.assert_stop_routine_load(ret, client, routine_load_job_name, 'stream load failed')
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 30)
|
|
ret = client.select_all(table_name)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_routine_filter_ratio():
|
|
"""
|
|
{
|
|
"title": "test_delete_routine_filter_ratio",
|
|
"describe": "验证routine load的delete的数据过滤",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('DELETE')
|
|
routine_load_property.set_column_mapping(column_name_list)
|
|
routine_load_property.set_where_predicates('k1 > 8')
|
|
routine_load_property.set_partitions(['p3'])
|
|
routine_load_property.set_max_error_number(15)
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 1)
|
|
ret = client.select_all(table_name)
|
|
common.assert_stop_routine_load(ret == (), client, routine_load_job_name, 'check failed')
|
|
# 向表中导入数据,再delete load
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='APPEND')
|
|
common.assert_stop_routine_load(ret, client, routine_load_job_name, 'stream load failed')
|
|
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 2)
|
|
ret = client.show_routine_load(routine_load_job_name)
|
|
routine_load_job = palo_job.RoutineLoadJob(ret[0])
|
|
error_rows = routine_load_job.get_error_rows()
|
|
state = routine_load_job.get_state()
|
|
client.stop_routine_load(routine_load_job_name)
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1 != 9 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_routine_basic():
|
|
"""
|
|
{
|
|
"title": "test_merge_routine_basic",
|
|
"describe": "验证routine load的merge基本功能",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('MERGE')
|
|
routine_load_property.set_delete_on_predicates('k1 % 3 = 0')
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
ret = client.select_all(table_name)
|
|
assert len(ret) == 10, 'check failed'
|
|
# 向表中导入数据,再merge,delete on条件未命中数据,数据全部导入
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k1=0')
|
|
assert ret, 'stream load failed'
|
|
sql = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb))
|
|
# 再导入,delete on条件命中部分数据,命中数据被删除,其他数据保持不变
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property.set_delete_on_predicates('k9 > 0')
|
|
# 设置了offset和分区
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s where k9 <= 0 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_routine_set_columns():
|
|
"""
|
|
{
|
|
"title": "test_merge_routine_set_columns",
|
|
"describe": "验证routine load的merge的列设置",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('MERGE')
|
|
routine_load_property.set_delete_on_predicates('k1 > 0')
|
|
routine_load_property.set_column_mapping(column_name_list)
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='APPEND')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
routine_load_job_name = util.get_label()
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_delete_on_predicates('k11<0')
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k8 >= 0 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_routine_filter_ratio():
|
|
"""
|
|
{
|
|
"title": "test_merge_routine_filter_ratio",
|
|
"describe": "验证routine load的merge的数据过滤",
|
|
"tag": "p1,function"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# 带有column和set,导入一张空表
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
where = 'k1 > 8'
|
|
partitions = ['p3']
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('MERGE')
|
|
routine_load_property.set_delete_on_predicates('k1 > 0')
|
|
routine_load_property.set_column_mapping(column_name_list)
|
|
routine_load_property.set_where_predicates(where)
|
|
routine_load_property.set_partitions(partitions)
|
|
routine_load_property.set_max_error_number(15)
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 1)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
# 向表中导入数据,再delete load
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list,
|
|
merge_type='APPEND')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
# create routine delete load
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property.set_delete_on_predicates('k9 > "2000-01-01 00:00:00"')
|
|
routine_load_property.set_where_predicates('k5 is not null')
|
|
routine_load_property.set_partitions(['p1', 'p2', 'p3', 'p4'])
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert ret, 'routine load create failed'
|
|
client.wait_routine_load_state(routine_load_job_name)
|
|
|
|
kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt')
|
|
client.wait_routine_load_commit(routine_load_job_name, 15)
|
|
client.stop_routine_load(routine_load_job_name)
|
|
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k11 <= "20000101" order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_with_delete_on():
|
|
"""
|
|
{
|
|
"title": "test_delete_with_delete_on",
|
|
"describe": "验证当delete与delete on条件连用时报错",
|
|
"tag": "p1,function,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# broker load failed
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE',
|
|
delete_on_predicates='k1 > 0')
|
|
msg = 'not support DELETE ON clause when merge type is not MERGE'
|
|
util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info)
|
|
# stream load failed
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE', delete='k1>0')
|
|
assert not ret, 'expect stream load failed'
|
|
# routine load failed
|
|
routine_load_job_name = util.get_label()
|
|
routine_load_property = palo_client.RoutineLoadProperty()
|
|
routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list)
|
|
routine_load_property.set_kafka_topic(TOPIC)
|
|
partition_offset = kafka_config.get_topic_offset(TOPIC)
|
|
routine_load_property.set_kafka_partitions(','.join(partition_offset.keys()))
|
|
routine_load_property.set_kafka_offsets(','.join(partition_offset.values()))
|
|
routine_load_property.set_merge_type('DELETE')
|
|
routine_load_property.set_delete_on_predicates('k1 > 0')
|
|
ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property)
|
|
assert not ret, 'expect create routine load failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_merge_with_delete_on():
|
|
"""
|
|
{
|
|
"title": "test_merge_with_delete_on",
|
|
"describe": "验证merge必须带有delete on条件,否则报错;测试delete on条件,尤其是和set连用时",
|
|
"tag": "p1,function,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
|
|
partition_info=DATA.baseall_tinyint_partition_info,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.datatype_column_uniq_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# load fail without delete on
|
|
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12']
|
|
set_list = ['k0=k7', 'k5=k4']
|
|
stream_column_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4']
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
column_name_list=column_name_list, set_list=set_list)
|
|
msg = 'Excepted DELETE ON clause when merge type is MERGE'
|
|
util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE',
|
|
column_name_list=stream_column_list)
|
|
assert not ret, 'expect stream load failed'
|
|
# set: k0=k6, delete: k0 = true -> fail
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
|
|
merge_type='MERGE', column_name_list=column_name_list,
|
|
set_list=set_list, delete_on_predicates='k0=true')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert not ret, 'expect failed. unknown reference column, column=__DORIS_DELETE_SIGN__, reference=k0'
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE',
|
|
column_name_list=stream_column_list, delete='k0=true')
|
|
assert not ret, 'expect failed. unknown reference column, column=__DORIS_DELETE_SIGN__, reference=k0'
|
|
# set: k2=k2/2+1, delete: k2 & k1 -> succ
|
|
set_list = ['k0=k7', 'k5=k4', 'k2=k2/2+1']
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
|
|
merge_type='MERGE', column_name_list=column_name_list,
|
|
set_list=set_list, delete_on_predicates='k2>0 and abs(k1) = 1')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, cast(k2/2 + 1 as int), k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1!=1 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
stream_column_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9',
|
|
'k10', 'k11', 'k12', 'k0=k7', 'k5=k4', 'k2=k2/2 + 1']
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE',
|
|
column_name_list=stream_column_list, delete='k2 <= 0 or k1 not in (1)')
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s' % (database_name, table_name)
|
|
sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, cast(k2/2 + 1 as int), k3, k4, ' \
|
|
'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1=1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_merge_special():
|
|
"""
|
|
{
|
|
"title": "test_delete_merge_special",
|
|
"describe": "delete & merge 特殊值",
|
|
"tag": "p1,system,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.char_normal_column_no_agg_list,
|
|
distribution_info=DATA.hash_distribution_info,
|
|
keys_desc=DATA.unique_key, set_null=True)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
# load
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='DELETE')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 is not null')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == ((None, None),)
|
|
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 is null')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret1 = client.select_all(table_name)
|
|
ret2 = ((u'hello', u'hello'), (u'H', u'H'), (u'hello,hello', u'hello,hello'), (u'h', u'h'),
|
|
(u'\u4ed3\u5e93', u'\u5b89\u5168'), (u'', u''))
|
|
util.check(ret1, ret2, True)
|
|
# 当k1文件中的值为Null时,delete on条件k1="仓库"返回null,该条数据认为是错误数据被过滤,需设置max_filter_ratio
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1="仓库"')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert not ret, 'expect broker load failed'
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.2)
|
|
assert ret, 'broker load failed'
|
|
ret1 = client.select_all(table_name)
|
|
ret2 = ((u'H', u'H'), (u'hello,hello', u'hello,hello'), (u'h', u'h'), (u'', u''), (u'hello', u'hello'))
|
|
util.check(ret1, ret2, True)
|
|
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='DELETE')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == (), 'check failed'
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_enable_batch_delete():
|
|
"""
|
|
{
|
|
"title": "test_enable_batch_delete",
|
|
"describe": "多次执行enable,结果正确。enable后,执行drop column,然后导入验证。未enable的时候delete load。agg和duplicate表agg",
|
|
"tag": "p1,system,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
client.set_variables('show_hidden_columns', 1)
|
|
ret = client.schema_change_drop_column(table_name, ['__DORIS_DELETE_SIGN__'], is_wait_job=True)
|
|
assert ret
|
|
except Exception as e:
|
|
pass
|
|
# 不enable,执行delete load报错
|
|
msg = 'load by MERGE or DELETE need to upgrade table to support batch delete'
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 > 0')
|
|
util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info)
|
|
# enable,导入成功
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
print(client.show_variables('show_hidden_columns'))
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 1, 2 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
# 再次enable,enable失败
|
|
msg = 'Can not enable batch delete support, already supported batch delete.'
|
|
util.assert_return(False, msg, client.enable_feature_batch_delete, table_name)
|
|
# drop column隐藏列成功
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
msg = 'Nothing is changed. please check your alter stmt.'
|
|
ret = client.schema_change_drop_column(table_name, ['__DORIS_DELETE_SIGN__', '__DORIS_VERSION_COL__'],
|
|
is_wait_job=True)
|
|
assert ret
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') is None
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') is None
|
|
msg = 'load by MERGE or DELETE need to upgrade table to support batch delete'
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 > 0')
|
|
util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_merge_rollup_1():
|
|
"""
|
|
{
|
|
"title": "test_delete_merge_rollup_1",
|
|
"describe": "enable,导入,创建rollup,导入",
|
|
"tag": "p1,system"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key,
|
|
enable_unique_key_merge_on_write="false")
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 = 1')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1 = 1, 2 from %s.%s order by k1' \
|
|
% (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k10', 'k11']
|
|
ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True)
|
|
assert ret
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__'
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__'
|
|
sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s where k1 != 1 order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 != 1')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select %s from %s.%s' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s WHERE k1 = 1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
sql1 = 'select %s from %s.%s' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s WHERE k1 = 1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_merge_rollup_2():
|
|
"""
|
|
{
|
|
"title": "test_delete_merge_rollup_2",
|
|
"describe": "建表,创建rollup,导入,enable,导入",
|
|
"tag": "p1,system"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
# creat tb, create rollup & load
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key,
|
|
enable_unique_key_merge_on_write="false")
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k10', 'k11']
|
|
ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True)
|
|
assert ret
|
|
assert client.get_index(table_name, index_name)
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='APPEND')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql1 % (database_name, table_name), sql2=sql1 % (check_db, baseall_tb))
|
|
sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
# enable
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__'
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__'
|
|
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 2 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 > 10')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name)
|
|
sql2 = 'select %s from %s.%s where k1 <= 10 order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
idx = common.get_explain_rollup(client, sql1)
|
|
assert index_name in idx
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_add_column_delete_load():
|
|
"""
|
|
{
|
|
"title": "test_add_column_delete_load",
|
|
"describe": "带rollup的表,加列不影响delete load",
|
|
"tag": "p1,system"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
# 建表
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key,
|
|
enable_unique_key_merge_on_write="false")
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
# 创建物化视图
|
|
rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k10', 'k11']
|
|
ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True)
|
|
assert ret
|
|
assert client.get_index(table_name, index_name)
|
|
# 导入 & 验证
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='APPEND')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql1 % (database_name, table_name), sql2=sql1 % (check_db, baseall_tb))
|
|
# enable批量删除,并验证
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__'
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__'
|
|
# 加列并验证
|
|
v_add = [('k_add', 'INT', '', '0')]
|
|
ret = client.schema_change_add_column(table_name, v_add, is_wait_job=True)
|
|
assert ret
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__'
|
|
hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 0, 2 from %s.%s order by k1' \
|
|
% (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
# 导入并验证
|
|
column_list = DATA.baseall_column_name_list
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_list,
|
|
merge_type='MERGE', delete_on_predicates='k1 > 0')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
ret = client.select_all(table_name)
|
|
assert ret == ()
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_drop_column_delete_load():
|
|
"""
|
|
{
|
|
"title": "test_drop_column_delete_load",
|
|
"describe": "带rollup的表,减列不影响delete load,删除隐藏列,预期失败",
|
|
"tag": "p1,system,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
# creat tb, create rollup & load
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_unique_key,
|
|
enable_unique_key_merge_on_write="false")
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name), 'can not get table: %s' % table_name
|
|
# load
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='APPEND')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1'
|
|
common.check2(client, sql1=sql1 % (database_name, table_name), sql2=sql1 % (check_db, baseall_tb))
|
|
# enable
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__')
|
|
# rollup
|
|
rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k7', 'k6', 'k10', 'k11']
|
|
ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True)
|
|
assert ret
|
|
assert client.get_index(table_name, index_name)
|
|
# drop column. Can not drop key column in Unique data model table
|
|
ret = client.schema_change_drop_column(table_name, ['k9'], is_wait_job=True)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, 0, 2 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
# merge load
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
|
|
column_name_list=DATA.baseall_column_name_list,
|
|
merge_type='MERGE', delete_on_predicates='k6 in ("false")')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k6 in ("false"), 3 ' \
|
|
'from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_add_drop_partition_delete_load():
|
|
"""
|
|
{
|
|
"title": "test_delete_merge_duplicate_data",
|
|
"describe": "加减分区不影响批量删除功能",
|
|
"tag": "p1,system"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
partition_info = palo_client.PartitionInfo("k1",
|
|
["p1", "p2", "p3"],
|
|
["-10", "0", "10"])
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
partition_info=partition_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
# merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.5)
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s where k1 < 10 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.add_partition(table_name, 'p4', 20), 'add partition failed'
|
|
assert client.get_partition(table_name, 'p4')
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE',
|
|
delete_on_predicates='k1 < 10',
|
|
partition_list=['p1', 'p2', 'p3', 'p4'])
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True)
|
|
assert ret, 'broker load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s where k1 >= 10 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.drop_partition(table_name, 'p3')
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='delete')
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.8)
|
|
assert ret, 'broker load failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == ()
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_and_delete_load():
|
|
"""
|
|
{
|
|
"title": "test_delete_and_delete_load",
|
|
"describe": "delete & truncate table,关注show_hiden_column模式下,数据的正确性",
|
|
"tag": "p1,system"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
partition_info = palo_client.PartitionInfo("k1",
|
|
["p1", "p2", "p3", "p4"],
|
|
["-10", "0", "10", "20"])
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
partition_info=partition_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete="k1 = 1")
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s where k1 != 1 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1=1, 2 from %s.%s order by k1' \
|
|
% (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.delete(table_name, 'k1=1', 'p3')
|
|
assert ret, 'delete failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1=1, 2 from %s.%s ' \
|
|
'where k1 != 1 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.delete(table_name, 'k1>1', 'p4')
|
|
assert ret, 'delete failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1=1, 2 from %s.%s ' \
|
|
'where k1 != 1 and k1 < 10 order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='delete')
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 1, 4 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
assert client.truncate(table_name), 'truncate table failed'
|
|
ret = client.select_all(table_name)
|
|
assert ret == ()
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete="k1 = 1")
|
|
assert ret, 'stream load failed'
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select *, k1=1, 2 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_batch_delete_insert():
|
|
"""
|
|
{
|
|
"title": "test_batch_delete_insert",
|
|
"describe": "开启删除导入,insert select & insert value数据成功,数据正确",
|
|
"tag": "p1,system"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
partition_info = palo_client.PartitionInfo("k1",
|
|
["p1", "p2", "p3", "p4"],
|
|
["-10", "0", "10", "20"])
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
partition_info=partition_info,
|
|
keys_desc=DATA.baseall_unique_key, set_null=True)
|
|
assert ret, 'create table failed'
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.insert_select(table_name, 'select * from %s.%s' % (check_db, baseall_tb))
|
|
assert ret
|
|
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 2 from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2)
|
|
sql = 'insert into %s values(null, null, null, null, null, null, null, null, null, null, null)' % table_name
|
|
ret = client.execute(sql)
|
|
assert ret == (), 'insert values failed'
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='merge', delete='k1 is null')
|
|
assert ret, 'stream load failed'
|
|
sql2 = 'select null, null, null, null, null, null, null, null, null, null, null, 0, 2 ' \
|
|
'union select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 3 from %s.%s' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='delete')
|
|
assert ret, 'stream load failed'
|
|
sql2 = 'select null, null, null, null, null, null, null, null, null, null, null, 0, 2 ' \
|
|
'union select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 1, 4 from %s.%s' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_batch_delete_some_times():
|
|
"""
|
|
{
|
|
"title": "test_batch_delete_some_times",
|
|
"describe": "多次执行导入删除,验证最后结果正确",
|
|
"tag": "p1,system,stability"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
partition_info = palo_client.PartitionInfo("k1",
|
|
["p1", "p2", "p3", "p4"],
|
|
["-10", "0", "10", "20"])
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
partition_info=partition_info,
|
|
keys_desc=DATA.baseall_unique_key)
|
|
assert ret, 'create table failed'
|
|
load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name)
|
|
ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.5)
|
|
assert ret
|
|
sq11 = 'select * from %s.%s order by k1' % (database_name, table_name)
|
|
sql2 = 'select * from %s.%s order by k1' % (check_db, baseall_tb)
|
|
common.check2(client, sql1=sq11, sql2=sql2)
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
for i in range(20):
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='merge', delete="k1 > %s" % i)
|
|
assert ret, 'stream load failed'
|
|
time.sleep(3)
|
|
common.check2(client, sql1=sq11, sql2=sql2)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_merge_limitation():
|
|
"""
|
|
{
|
|
"title": "test_delete_merge_limitation",
|
|
"describe": "验证delete load的限制",
|
|
"tag": "p1,function,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.baseall_column_no_agg_list,
|
|
distribution_info=DATA.baseall_distribution_info,
|
|
keys_desc=DATA.baseall_duplicate_key)
|
|
assert ret, 'create table failed'
|
|
|
|
ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE')
|
|
assert not ret
|
|
msg = 'Batch delete only supported in unique tables.'
|
|
util.assert_return(False, msg, client.enable_feature_batch_delete, table_name)
|
|
client.clean(database_name)
|
|
|
|
|
|
def test_delete_merge_duplicate_data():
|
|
"""
|
|
{
|
|
"title": "test_delete_merge_duplicate_data",
|
|
"describe": "验证delete load的文件中,当key相同的时候以最后出现的key为准,delete on为value,验证结果正确",
|
|
"tag": "p1,function,fuzz"
|
|
}
|
|
"""
|
|
database_name, table_name, index_name = util.gen_name_list()
|
|
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
|
|
client = common.create_workspace(database_name)
|
|
ret = client.create_table(table_name, DATA.tinyint_column_no_agg_list,
|
|
distribution_info=DATA.hash_distribution_info,
|
|
keys_desc='UNIQUE KEY(k1)')
|
|
assert ret, 'create table failed'
|
|
assert client.show_tables(table_name)
|
|
try:
|
|
ret = client.enable_feature_batch_delete(table_name)
|
|
assert ret, 'enable batch delete feature failed'
|
|
except Exception as e:
|
|
pass
|
|
assert client.set_variables('show_hidden_columns', 1)
|
|
ret = client.desc_table(table_name, is_all=True)
|
|
assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__')
|
|
ret = client.stream_load(table_name, FILE.test_tinyint_file, max_filter_ratio=0.1,
|
|
merge_type='merge', delete='v1=1')
|
|
assert ret, 'stream load failed'
|
|
ret = client.stream_load(table_name, FILE.test_tinyint_file, max_filter_ratio=0.1,
|
|
merge_type='merge', delete='v1=3')
|
|
assert ret, 'stream load failed'
|
|
assert client.set_variables('show_hidden_columns', 0)
|
|
ret = client.select_all(table_name)
|
|
assert ret == ()
|
|
client.clean(database_name)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
setup_module()
|
|
|
|
|