[fix](broker-load) fix error when using multi data description for same table in load stmt (#22666)

For load request, there are 2 tuples on scan node, input tuple and output tuple.
The input tuple is for reading file, and it will be converted to output tuple based on user specified column mappings.

And the broker load support different column mapping in different data description to same table(or partition).
So for each scanner, the output tuples are same but the input tuple can be different.

The previous implements save the input tuple in scan node level, causing different scanner using same input tuple,
which is incorrect.
This PR remove the input tuple from scan node and save them in each scanners.
This commit is contained in:
Mingyu Chen
2023-08-07 20:03:03 +08:00
committed by GitHub
parent f074909d3c
commit c9dc715c5d
12 changed files with 104 additions and 8260 deletions

View File

@ -53,7 +53,6 @@ Status NewFileScanNode::prepare(RuntimeState* state) {
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[id()];
_input_tuple_id = params.src_tuple_id;
_output_tuple_id = params.dest_tuple_id;
}
return Status::OK();
@ -85,8 +84,6 @@ void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
// in new implement, the tuple id is set in prepare phase
_input_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
_output_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
}

View File

@ -29,14 +29,12 @@ class PipScannerContext : public vectorized::ScannerContext {
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* input_tuple_desc,
const TupleDescriptor* output_tuple_desc,
const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit,
int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids,
const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc,
scanners, limit, max_bytes_in_blocks_queue,
num_parallel_instances),
: vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit,
max_bytes_in_blocks_queue, num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}

View File

@ -43,13 +43,11 @@
namespace doris::vectorized {
ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* input_tuple_desc,
const doris::TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances)
: _state(state_),
_parent(parent),
_input_tuple_desc(input_tuple_desc),
_output_tuple_desc(output_tuple_desc),
_process_status(Status::OK()),
_batch_size(state_->batch_size()),
@ -69,7 +67,6 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
// After init function call, should not access _parent
Status ScannerContext::init() {
_real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc;
// 1. Calculate max concurrency
// TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
@ -154,7 +151,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
}
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
return vectorized::Block::create_unique(_real_tuple_desc->slots(), _batch_size,
return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size,
true /*ignore invalid slots*/);
}

View File

@ -63,7 +63,7 @@ class ScannerContext {
ENABLE_FACTORY_CREATOR(ScannerContext);
public:
ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc,
ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0);
@ -173,11 +173,7 @@ protected:
VScanNode* _parent;
// the comment of same fields in VScanNode
const TupleDescriptor* _input_tuple_desc;
const TupleDescriptor* _output_tuple_desc;
// If _input_tuple_desc is not null, _real_tuple_desc point to _input_tuple_desc,
// otherwise, _real_tuple_desc point to _output_tuple_desc
const TupleDescriptor* _real_tuple_desc;
// _transfer_lock is used to protect the critical section
// where the ScanNode and ScannerScheduler interact.

View File

@ -106,6 +106,12 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
CHECK(scan_range.__isset.params);
_params = &(scan_range.params);
}
// For load scanner, there are input and output tuple.
// For query scanner, there is only output tuple
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_params->src_tuple_id);
_real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc : _input_tuple_desc;
_is_load = (_input_tuple_desc != nullptr);
}
Status VFileScanner::prepare(

View File

@ -179,6 +179,13 @@ private:
// save the path of current scan range
std::string _current_range_path = "";
// Only for load scan node.
const TupleDescriptor* _input_tuple_desc = nullptr;
// If _input_tuple_desc is set,
// the _real_tuple_desc will point to _input_tuple_desc,
// otherwise, point to _output_tuple_desc
const TupleDescriptor* _real_tuple_desc = nullptr;
private:
Status _init_expr_ctxes();
Status _init_src_block(Block* block);

View File

@ -178,7 +178,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
_input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_ERROR(_acquire_runtime_filter());
@ -316,12 +315,11 @@ Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
if (_is_pipeline_scan) {
int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1;
_scanner_ctx = pipeline::PipScannerContext::create_shared(
_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(),
_state->scan_queue_mem_limit(), _col_distribute_ids, max_queue_size);
_state, this, _output_tuple_desc, scanners, limit(), _state->scan_queue_mem_limit(),
_col_distribute_ids, max_queue_size);
} else {
_scanner_ctx =
ScannerContext::create_shared(_state, this, _input_tuple_desc, _output_tuple_desc,
scanners, limit(), _state->scan_queue_mem_limit());
_scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, scanners,
limit(), _state->scan_queue_mem_limit());
}
return Status::OK();
}

View File

@ -147,9 +147,7 @@ public:
int runtime_filter_num() const { return (int)_runtime_filter_ctxs.size(); }
TupleId input_tuple_id() const { return _input_tuple_id; }
TupleId output_tuple_id() const { return _output_tuple_id; }
const TupleDescriptor* input_tuple_desc() const { return _input_tuple_desc; }
const TupleDescriptor* output_tuple_desc() const { return _output_tuple_desc; }
Status alloc_resource(RuntimeState* state) override;
@ -242,11 +240,8 @@ protected:
bool _is_pipeline_scan = false;
bool _shared_scan_opt = false;
// For load scan node, there should be both input and output tuple descriptor.
// For query scan node, there is only output_tuple_desc.
TupleId _input_tuple_id = -1;
// the output tuple of this scan node
TupleId _output_tuple_id = -1;
const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
doris::Mutex _block_lock;

View File

@ -33,11 +33,8 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, Runtim
_parent(parent),
_limit(limit),
_profile(profile),
_input_tuple_desc(parent->input_tuple_desc()),
_output_tuple_desc(parent->output_tuple_desc()) {
_real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc;
_total_rf_num = _parent->runtime_filter_num();
_is_load = (_input_tuple_desc != nullptr);
}
Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) {

View File

@ -164,9 +164,7 @@ protected:
RuntimeProfile* _profile;
const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
const TupleDescriptor* _real_tuple_desc = nullptr;
// If _input_tuple_desc is set, the scanner will read data into
// this _input_block first, then convert to the output block.

File diff suppressed because it is too large Load Diff

View File

@ -38,7 +38,6 @@ suite("test_broker_load_p2", "p2") {
"parquet_s3_case6", // normal
"parquet_s3_case7", // col5 will be ignored, load normally
"parquet_s3_case8", // first column in table is not specified, will load default value for it.
"parquet_s3_case9", // first column in table is not specified, will load default value for it.
"orc_s3_case1", // table column capitalize firsrt
"orc_s3_case2", // table column lowercase * load column lowercase * orc file lowercase
"orc_s3_case3", // table column lowercase * load column uppercase * orc file lowercase
@ -71,7 +70,6 @@ suite("test_broker_load_p2", "p2") {
"s3://doris-build-1308700295/regression/load/data/part*",
"s3://doris-build-1308700295/regression/load/data/part*",
"s3://doris-build-1308700295/regression/load/data/part*",
"s3://doris-build-1308700295/regression/load/data/random_all_types/part*",
"s3://doris-build-1308700295/regression/load/data/orc/hits_100k_rows.orc",
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc",
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc",
@ -104,7 +102,6 @@ suite("test_broker_load_p2", "p2") {
"""p_partkey, p_name, p_mfgr, p_brand""",
"""p_partkey, p_name, p_mfgr, p_brand""",
"""p_name, p_mfgr""",
"""""",
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,advengineid,isartifical,windowclientwidth,windowclientheight,clienttimezone,clienteventtime,silverlightversion1,silverlightversion2,silverlightversion3,silverlightversion4,pagecharset,codeversion,islink,isdownload,isnotbounce,funiqid,originalurl,hid,isoldcounter,isevent,isparameter,dontcounthits,withhash,hitcolor,localeventtime,age,sex,income,interests,robotness,remoteip,windowname,openername,historylength,browserlanguage,browsercountry,socialnetwork,socialaction,httperror,sendtiming,dnstiming,connecttiming,responsestarttiming,responseendtiming,fetchtiming,socialsourcenetworkid,socialsourcepage,paramprice,paramorderid,paramcurrency,paramcurrencyid,openstatservicename,openstatcampaignid,openstatadid,openstatsourceid,utmsource,utmmedium,utmcampaign,utmcontent,utmterm,fromtag,hasgclid,refererhash,urlhash,clid""",
//TODO: comment blow 8 rows after jibing fix
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,advengineid,isartifical,windowclientwidth,windowclientheight,clienttimezone,clienteventtime,silverlightversion1,silverlightversion2,silverlightversion3,silverlightversion4,pagecharset,codeversion,islink,isdownload,isnotbounce,funiqid,originalurl,hid,isoldcounter,isevent,isparameter,dontcounthits,withhash,hitcolor,localeventtime,age,sex,income,interests,robotness,remoteip,windowname,openername,historylength,browserlanguage,browsercountry,socialnetwork,socialaction,httperror,sendtiming,dnstiming,connecttiming,responsestarttiming,responseendtiming,fetchtiming,socialsourcenetworkid,socialsourcepage,paramprice,paramorderid,paramcurrency,paramcurrencyid,openstatservicename,openstatcampaignid,openstatadid,openstatsourceid,utmsource,utmmedium,utmcampaign,utmcontent,utmterm,fromtag,hasgclid,refererhash,urlhash,clid""",
@ -126,8 +123,8 @@ suite("test_broker_load_p2", "p2") {
// """WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE,ADVENGINEID,ISARTIFICAL,WINDOWCLIENTWIDTH,WINDOWCLIENTHEIGHT,CLIENTTIMEZONE,CLIENTEVENTTIME,SILVERLIGHTVERSION1,SILVERLIGHTVERSION2,SILVERLIGHTVERSION3,SILVERLIGHTVERSION4,PAGECHARSET,CODEVERSION,ISLINK,ISDOWNLOAD,ISNOTBOUNCE,FUNIQID,ORIGINALURL,HID,ISOLDCOUNTER,ISEVENT,ISPARAMETER,DONTCOUNTHITS,WITHHASH,HITCOLOR,LOCALEVENTTIME,AGE,SEX,INCOME,INTERESTS,ROBOTNESS,REMOTEIP,WINDOWNAME,OPENERNAME,HISTORYLENGTH,BROWSERLANGUAGE,BROWSERCOUNTRY,SOCIALNETWORK,SOCIALACTION,HTTPERROR,SENDTIMING,DNSTIMING,CONNECTTIMING,RESPONSESTARTTIMING,RESPONSEENDTIMING,FETCHTIMING,SOCIALSOURCENETWORKID,SOCIALSOURCEPAGE,PARAMPRICE,PARAMORDERID,PARAMCURRENCY,PARAMCURRENCYID,OPENSTATSERVICENAME,OPENSTATCAMPAIGNID,OPENSTATADID,OPENSTATSOURCEID,UTMSOURCE,UTMMEDIUM,UTMCAMPAIGN,UTMCONTENT,UTMTERM,FROMTAG,HASGCLID,REFERERHASH,URLHASH,CLID""",
// """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,advengineid,isartifical,windowclientwidth,windowclientheight,clienttimezone,clienteventtime,silverlightversion1,silverlightversion2,silverlightversion3,silverlightversion4,pagecharset,codeversion,islink,isdownload,isnotbounce,funiqid,originalurl,hid,isoldcounter,isevent,isparameter,dontcounthits,withhash,hitcolor,localeventtime,age,sex,income,interests,robotness,remoteip,windowname,openername,historylength,browserlanguage,browsercountry,socialnetwork,socialaction,httperror,sendtiming,dnstiming,connecttiming,responsestarttiming,responseendtiming,fetchtiming,socialsourcenetworkid,socialsourcepage,paramprice,paramorderid,paramcurrency,paramcurrencyid,openstatservicename,openstatcampaignid,openstatadid,openstatsourceid,utmsource,utmmedium,utmcampaign,utmcontent,utmterm,fromtag,hasgclid,refererhash,urlhash,clid""",
]
def column_in_paths = ["", "", "", "", "", "", "", "", "", "", "", "", "COLUMNS FROM PATH AS (city)", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
def preceding_filters = ["", "", "", "", "", "", "", "", "", "", "", "preceding filter p_size < 10", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
def column_in_paths = ["", "", "", "", "", "", "", "", "", "", "", "", "COLUMNS FROM PATH AS (city)", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
def preceding_filters = ["", "", "", "", "", "", "", "", "", "", "", "preceding filter p_size < 10", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
def set_values = ["",
"",
"SET(comment=p_comment, retailprice=p_retailprice, container=p_container, size=p_size, type=p_type, brand=p_brand, mfgr=p_mfgr, name=p_name, partkey=p_partkey)",
@ -158,12 +155,11 @@ suite("test_broker_load_p2", "p2") {
"",
"",
"",
"",
""
]
def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where p_partkey>10", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where p_partkey>10", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
def line_delimiters = ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "\u0007"]
def line_delimiters = ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "\u0007"]
def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
@ -186,7 +182,6 @@ suite("test_broker_load_p2", "p2") {
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=4096",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=100000",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
"unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
@ -229,7 +224,6 @@ suite("test_broker_load_p2", "p2") {
"cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0",
"cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0",
"cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0",
"cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0",
"cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0"
]
@ -270,9 +264,10 @@ suite("test_broker_load_p2", "p2") {
String sk = getS3SK()
String enabled = context.config.otherConfigs.get("enableBrokerLoad")
def do_load_job = {uuid, path, table, columns, column_in_path, preceding_filter,
def do_load_job = {label, path, table, columns, column_in_path, preceding_filter,
set_value, where_expr, line_delimiter ->
String columns_str = ("$columns" != "") ? "($columns)" : "";
String line_term = ("$line_delimiter" != "") ? "lines terminated by '$line_delimiter'" : "";
String format_str
if (table.startsWith("orc_s3_case")) {
format_str = "ORC"
@ -281,17 +276,17 @@ suite("test_broker_load_p2", "p2") {
} else {
format_str = "PARQUET"
}
sql """
LOAD LABEL $uuid (
def sql_str = """
LOAD LABEL $label (
DATA INFILE("$path")
INTO TABLE $table
$line_term
FORMAT AS $format_str
$columns_str
$column_in_path
$preceding_filter
$set_value
$where_expr
$line_delimiter
)
WITH S3 (
"AWS_ACCESS_KEY" = "$ak",
@ -303,7 +298,9 @@ suite("test_broker_load_p2", "p2") {
"use_new_load_scan_node" = "true"
)
"""
logger.info("Submit load with lable: $uuid, table: $table, path: $path")
logger.info("submit sql: ${sql_str}");
sql """${sql_str}"""
logger.info("Submit load with lable: $label, table: $table, path: $path")
}
if (enabled != null && enabled.equalsIgnoreCase("true")) {
@ -315,8 +312,9 @@ suite("test_broker_load_p2", "p2") {
sql new File("""${context.file.parent}/ddl/${table}_create.sql""").text
def uuid = UUID.randomUUID().toString().replace("-", "0")
uuids.add(uuid)
do_load_job.call(uuid, paths[i], table, columns_list[i], column_in_paths[i], preceding_filters[i],
def the_label = "L${i}_${uuid}"
uuids.add(the_label)
do_load_job.call(the_label, paths[i], table, columns_list[i], column_in_paths[i], preceding_filters[i],
set_values[i], where_exprs[i], line_delimiters[i])
i++
}
@ -359,7 +357,69 @@ suite("test_broker_load_p2", "p2") {
order_qt_parquet_s3_case6 """select count(*) from parquet_s3_case6 where p_partkey < 100000"""
order_qt_parquet_s3_case7 """select count(*) from parquet_s3_case7 where col4=4"""
order_qt_parquet_s3_case8 """ select count(*) from parquet_s3_case8 where p_partkey=1"""
order_qt_parquet_s3_case9 """ select * from parquet_s3_case9"""
// pr 22666
def tbl_22666 = "part_22666"
sql """drop table if exists ${tbl_22666} force"""
sql """
CREATE TABLE ${tbl_22666} (
p_partkey int NULL,
p_name VARCHAR(55) NULL,
p_mfgr VARCHAR(25) NULL,
p_brand VARCHAR(10) NULL,
p_type VARCHAR(25) NULL,
p_size int NULL,
p_container VARCHAR(10) NULL,
p_retailprice decimal(15, 2) NULL,
p_comment VARCHAR(23) NULL
)ENGINE=OLAP
DUPLICATE KEY(`p_partkey`)
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""
def label_22666 = "part_" + UUID.randomUUID().toString().replace("-", "0")
sql """
LOAD LABEL ${label_22666} (
DATA INFILE("s3://doris-build-1308700295/regression/load/data/part0.parquet")
INTO TABLE ${tbl_22666}
FORMAT AS "PARQUET"
(p_partkey, p_name, p_mfgr),
DATA INFILE("s3://doris-build-1308700295/regression/load/data/part1.parquet")
INTO TABLE ${tbl_22666}
FORMAT AS "PARQUET"
(p_partkey, p_brand, p_type)
)
WITH S3 (
"AWS_ACCESS_KEY" = "$ak",
"AWS_SECRET_KEY" = "$sk",
"AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
"AWS_REGION" = "ap-beijing"
);
"""
def max_try_milli_secs = 600000
while (max_try_milli_secs > 0) {
String[][] result = sql """ show load where label="$label_22666" order by createtime desc limit 1; """
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + label_22666)
break;
}
if (result[0][2].equals("CANCELLED")) {
assertTrue(false, "load failed: $result")
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $label_22666")
}
}
order_qt_pr22666_1 """ select count(*) from ${tbl_22666} where p_brand is not null limit 10;"""
order_qt_pr22666_2 """ select count(*) from ${tbl_22666} where p_name is not null limit 10;"""
} finally {
for (String table in tables) {