[fix](thrift)cancel thrift msg max size limit (#25194)
On Thrift 0.14.0+, need use TConfiguration to raise the max message size. see https://github.com/apache/arrow/pull/11123/files
This commit is contained in:
@ -135,8 +135,14 @@ Status deserialize_thrift_msg(const uint8_t* buf, uint32_t* len, bool compact,
|
||||
// Deserialize msg bytes into c++ thrift msg using memory
|
||||
// transport. TMemoryBuffer is not const-safe, although we use it in
|
||||
// a const-safe way, so we have to explicitly cast away the const.
|
||||
auto conf = std::make_shared<apache::thrift::TConfiguration>();
|
||||
// On Thrift 0.14.0+, need use TConfiguration to raise the max message size.
|
||||
// max message size is 100MB default, so make it unlimited.
|
||||
conf->setMaxMessageSize(std::numeric_limits<int>::max());
|
||||
std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
|
||||
new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
|
||||
new apache::thrift::transport::TMemoryBuffer(
|
||||
const_cast<uint8_t*>(buf), *len,
|
||||
apache::thrift::transport::TMemoryBuffer::OBSERVE, conf));
|
||||
std::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
|
||||
create_deserialize_protocol(tmem_transport, compact);
|
||||
|
||||
|
||||
@ -0,0 +1,258 @@
|
||||
CREATE TABLE IF NOT EXISTS parquet_large_metadata_100mb
|
||||
(
|
||||
`0` BIGINT,
|
||||
`1` BIGINT,
|
||||
`2` BIGINT,
|
||||
`3` BIGINT,
|
||||
`4` BIGINT,
|
||||
`5` BIGINT,
|
||||
`6` BIGINT,
|
||||
`7` BIGINT,
|
||||
`8` BIGINT,
|
||||
`9` BIGINT,
|
||||
`10` BIGINT,
|
||||
`11` BIGINT,
|
||||
`12` BIGINT,
|
||||
`13` BIGINT,
|
||||
`14` BIGINT,
|
||||
`15` BIGINT,
|
||||
`16` BIGINT,
|
||||
`17` BIGINT,
|
||||
`18` BIGINT,
|
||||
`19` BIGINT,
|
||||
`20` BIGINT,
|
||||
`21` BIGINT,
|
||||
`22` BIGINT,
|
||||
`23` BIGINT,
|
||||
`24` BIGINT,
|
||||
`25` BIGINT,
|
||||
`26` BIGINT,
|
||||
`27` BIGINT,
|
||||
`28` BIGINT,
|
||||
`29` BIGINT,
|
||||
`30` BIGINT,
|
||||
`31` BIGINT,
|
||||
`32` BIGINT,
|
||||
`33` BIGINT,
|
||||
`34` BIGINT,
|
||||
`35` BIGINT,
|
||||
`36` BIGINT,
|
||||
`37` BIGINT,
|
||||
`38` BIGINT,
|
||||
`39` BIGINT,
|
||||
`40` BIGINT,
|
||||
`41` BIGINT,
|
||||
`42` BIGINT,
|
||||
`43` BIGINT,
|
||||
`44` BIGINT,
|
||||
`45` BIGINT,
|
||||
`46` BIGINT,
|
||||
`47` BIGINT,
|
||||
`48` BIGINT,
|
||||
`49` BIGINT,
|
||||
`50` BIGINT,
|
||||
`51` BIGINT,
|
||||
`52` BIGINT,
|
||||
`53` BIGINT,
|
||||
`54` BIGINT,
|
||||
`55` BIGINT,
|
||||
`56` BIGINT,
|
||||
`57` BIGINT,
|
||||
`58` BIGINT,
|
||||
`59` BIGINT,
|
||||
`60` BIGINT,
|
||||
`61` BIGINT,
|
||||
`62` BIGINT,
|
||||
`63` BIGINT,
|
||||
`64` BIGINT,
|
||||
`65` BIGINT,
|
||||
`66` BIGINT,
|
||||
`67` BIGINT,
|
||||
`68` BIGINT,
|
||||
`69` BIGINT,
|
||||
`70` BIGINT,
|
||||
`71` BIGINT,
|
||||
`72` BIGINT,
|
||||
`73` BIGINT,
|
||||
`74` BIGINT,
|
||||
`75` BIGINT,
|
||||
`76` BIGINT,
|
||||
`77` BIGINT,
|
||||
`78` BIGINT,
|
||||
`79` BIGINT,
|
||||
`80` BIGINT,
|
||||
`81` BIGINT,
|
||||
`82` BIGINT,
|
||||
`83` BIGINT,
|
||||
`84` BIGINT,
|
||||
`85` BIGINT,
|
||||
`86` BIGINT,
|
||||
`87` BIGINT,
|
||||
`88` BIGINT,
|
||||
`89` BIGINT,
|
||||
`90` BIGINT,
|
||||
`91` BIGINT,
|
||||
`92` BIGINT,
|
||||
`93` BIGINT,
|
||||
`94` BIGINT,
|
||||
`95` BIGINT,
|
||||
`96` BIGINT,
|
||||
`97` BIGINT,
|
||||
`98` BIGINT,
|
||||
`99` BIGINT,
|
||||
`100` BIGINT,
|
||||
`101` BIGINT,
|
||||
`102` BIGINT,
|
||||
`103` BIGINT,
|
||||
`104` BIGINT,
|
||||
`105` BIGINT,
|
||||
`106` BIGINT,
|
||||
`107` BIGINT,
|
||||
`108` BIGINT,
|
||||
`109` BIGINT,
|
||||
`110` BIGINT,
|
||||
`111` BIGINT,
|
||||
`112` BIGINT,
|
||||
`113` BIGINT,
|
||||
`114` BIGINT,
|
||||
`115` BIGINT,
|
||||
`116` BIGINT,
|
||||
`117` BIGINT,
|
||||
`118` BIGINT,
|
||||
`119` BIGINT,
|
||||
`120` BIGINT,
|
||||
`121` BIGINT,
|
||||
`122` BIGINT,
|
||||
`123` BIGINT,
|
||||
`124` BIGINT,
|
||||
`125` BIGINT,
|
||||
`126` BIGINT,
|
||||
`127` BIGINT,
|
||||
`128` BIGINT,
|
||||
`129` BIGINT,
|
||||
`130` BIGINT,
|
||||
`131` BIGINT,
|
||||
`132` BIGINT,
|
||||
`133` BIGINT,
|
||||
`134` BIGINT,
|
||||
`135` BIGINT,
|
||||
`136` BIGINT,
|
||||
`137` BIGINT,
|
||||
`138` BIGINT,
|
||||
`139` BIGINT,
|
||||
`140` BIGINT,
|
||||
`141` BIGINT,
|
||||
`142` BIGINT,
|
||||
`143` BIGINT,
|
||||
`144` BIGINT,
|
||||
`145` BIGINT,
|
||||
`146` BIGINT,
|
||||
`147` BIGINT,
|
||||
`148` BIGINT,
|
||||
`149` BIGINT,
|
||||
`150` BIGINT,
|
||||
`151` BIGINT,
|
||||
`152` BIGINT,
|
||||
`153` BIGINT,
|
||||
`154` BIGINT,
|
||||
`155` BIGINT,
|
||||
`156` BIGINT,
|
||||
`157` BIGINT,
|
||||
`158` BIGINT,
|
||||
`159` BIGINT,
|
||||
`160` BIGINT,
|
||||
`161` BIGINT,
|
||||
`162` BIGINT,
|
||||
`163` BIGINT,
|
||||
`164` BIGINT,
|
||||
`165` BIGINT,
|
||||
`166` BIGINT,
|
||||
`167` BIGINT,
|
||||
`168` BIGINT,
|
||||
`169` BIGINT,
|
||||
`170` BIGINT,
|
||||
`171` BIGINT,
|
||||
`172` BIGINT,
|
||||
`173` BIGINT,
|
||||
`174` BIGINT,
|
||||
`175` BIGINT,
|
||||
`176` BIGINT,
|
||||
`177` BIGINT,
|
||||
`178` BIGINT,
|
||||
`179` BIGINT,
|
||||
`180` BIGINT,
|
||||
`181` BIGINT,
|
||||
`182` BIGINT,
|
||||
`183` BIGINT,
|
||||
`184` BIGINT,
|
||||
`185` BIGINT,
|
||||
`186` BIGINT,
|
||||
`187` BIGINT,
|
||||
`188` BIGINT,
|
||||
`189` BIGINT,
|
||||
`190` BIGINT,
|
||||
`191` BIGINT,
|
||||
`192` BIGINT,
|
||||
`193` BIGINT,
|
||||
`194` BIGINT,
|
||||
`195` BIGINT,
|
||||
`196` BIGINT,
|
||||
`197` BIGINT,
|
||||
`198` BIGINT,
|
||||
`199` BIGINT,
|
||||
`200` BIGINT,
|
||||
`201` BIGINT,
|
||||
`202` BIGINT,
|
||||
`203` BIGINT,
|
||||
`204` BIGINT,
|
||||
`205` BIGINT,
|
||||
`206` BIGINT,
|
||||
`207` BIGINT,
|
||||
`208` BIGINT,
|
||||
`209` BIGINT,
|
||||
`210` BIGINT,
|
||||
`211` BIGINT,
|
||||
`212` BIGINT,
|
||||
`213` BIGINT,
|
||||
`214` BIGINT,
|
||||
`215` BIGINT,
|
||||
`216` BIGINT,
|
||||
`217` BIGINT,
|
||||
`218` BIGINT,
|
||||
`219` BIGINT,
|
||||
`220` BIGINT,
|
||||
`221` BIGINT,
|
||||
`222` BIGINT,
|
||||
`223` BIGINT,
|
||||
`224` BIGINT,
|
||||
`225` BIGINT,
|
||||
`226` BIGINT,
|
||||
`227` BIGINT,
|
||||
`228` BIGINT,
|
||||
`229` BIGINT,
|
||||
`230` BIGINT,
|
||||
`231` BIGINT,
|
||||
`232` BIGINT,
|
||||
`233` BIGINT,
|
||||
`234` BIGINT,
|
||||
`235` BIGINT,
|
||||
`236` BIGINT,
|
||||
`237` BIGINT,
|
||||
`238` BIGINT,
|
||||
`239` BIGINT,
|
||||
`240` BIGINT,
|
||||
`241` BIGINT,
|
||||
`242` BIGINT,
|
||||
`243` BIGINT,
|
||||
`244` BIGINT,
|
||||
`245` BIGINT,
|
||||
`246` BIGINT,
|
||||
`247` BIGINT,
|
||||
`248` BIGINT,
|
||||
`249` BIGINT
|
||||
)
|
||||
DISTRIBUTED BY HASH(`1`, `2`)
|
||||
PROPERTIES
|
||||
(
|
||||
"replication_allocation" = "tag.location.default: 1"
|
||||
);
|
||||
@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS parquet_large_metadata_100mb;
|
||||
@ -0,0 +1,120 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_parquet_large_metadata_load_p2", "p2") {
|
||||
|
||||
def tables = ["parquet_large_metadata_100mb" // metadata size more than 100MB
|
||||
]
|
||||
def paths = ["s3://doris-build-1308700295/regression/load/metadata/parquet_large_metadata_100mb.parquet"
|
||||
]
|
||||
String ak = getS3AK()
|
||||
String sk = getS3SK()
|
||||
String enabled = context.config.otherConfigs.get("enableBrokerLoad")
|
||||
|
||||
def expect_tvf_result = """[[2, 8], [2, 8], [2, 8], [2, 8], [2, 8]]"""
|
||||
String[][] tvf_result = sql """select `1`,`2` from s3(
|
||||
"uri" = "https://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/metadata/parquet_large_metadata_100mb.parquet",
|
||||
"s3.access_key" = "$ak",
|
||||
"s3.secret_key" = "$sk",
|
||||
"s3.region" = "ap-beijing",
|
||||
"format" = "parquet"
|
||||
) order by `1`,`2` limit 5;
|
||||
"""
|
||||
assertTrue("$tvf_result" == "$expect_tvf_result")
|
||||
|
||||
def do_load_job = { uuid, path, table ->
|
||||
sql """
|
||||
LOAD LABEL $uuid (
|
||||
APPEND
|
||||
DATA INFILE("$path")
|
||||
INTO TABLE $table
|
||||
FORMAT AS "PARQUET"
|
||||
)
|
||||
WITH S3 (
|
||||
"AWS_ACCESS_KEY" = "$ak",
|
||||
"AWS_SECRET_KEY" = "$sk",
|
||||
"AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
|
||||
"AWS_REGION" = "ap-beijing"
|
||||
)
|
||||
PROPERTIES
|
||||
(
|
||||
"strict_mode"="true"
|
||||
);
|
||||
"""
|
||||
logger.info("Submit load with lable: $uuid, table: $table, path: $path")
|
||||
}
|
||||
|
||||
def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=45000"]
|
||||
def task_info = ["cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0"]
|
||||
def error_msg = [""]
|
||||
// test unified load
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
def uuids = []
|
||||
try {
|
||||
def i = 0
|
||||
|
||||
for (String table in tables) {
|
||||
sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text
|
||||
sql new File("""${context.file.parent}/ddl/${table}_create.sql""").text
|
||||
|
||||
def uuid = UUID.randomUUID().toString().replace("-", "0")
|
||||
uuids.add(uuid)
|
||||
do_load_job.call(uuid, paths[i], table)
|
||||
i++
|
||||
}
|
||||
|
||||
i = 0
|
||||
for (String label in uuids) {
|
||||
def max_try_milli_secs = 600000
|
||||
while (max_try_milli_secs > 0) {
|
||||
String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """
|
||||
if (result[0][2].equals("FINISHED")) {
|
||||
logger.info("Load FINISHED " + label)
|
||||
assertTrue(result[0][6].contains(task_info[i]))
|
||||
assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5] + ", label: $label")
|
||||
break;
|
||||
}
|
||||
if (result[0][2].equals("CANCELLED")) {
|
||||
assertTrue(result[0][6].contains(task_info[i]))
|
||||
assertTrue(result[0][7].contains(error_msg[i]))
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000)
|
||||
max_try_milli_secs -= 1000
|
||||
if(max_try_milli_secs <= 0) {
|
||||
assertTrue(1 == 2, "load Timeout: $label")
|
||||
}
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
def expect_result = """[[45000]]"""
|
||||
|
||||
for (String table in tables) {
|
||||
if (table.matches("parquet_large_metadata_100mb")) {
|
||||
String[][] actual_result = sql """select count(*) from parquet_large_metadata_100mb;"""
|
||||
assertTrue("$actual_result" == "$expect_result")
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
for (String table in tables) {
|
||||
sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user