[fix](stream_load)fix bug for stream (#27752)
1. forbid thed stream_load without content-length or chunked Transfer Encoding 2. forbid thed stream_load both with content-length and chunked Transfer Encoding Co-authored-by: xingying01 <xingying01@corp.netease.com>
This commit is contained in:
@ -326,6 +326,18 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea
|
||||
ctx->is_chunked_transfer = true;
|
||||
}
|
||||
}
|
||||
if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
|
||||
!ctx->is_chunked_transfer))) {
|
||||
LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
|
||||
"content_length or transfer-encoding=chunked";
|
||||
return Status::InternalError(
|
||||
"content_length is empty and transfer-encoding!=chunked, please set content_length "
|
||||
"or transfer-encoding=chunked");
|
||||
} else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
|
||||
ctx->is_chunked_transfer)) {
|
||||
LOG(WARNING) << "please do not set both content_length and transfer-encoding";
|
||||
return Status::InternalError("please do not set both content_length and transfer-encoding");
|
||||
}
|
||||
|
||||
if (!http_req->header(HTTP_TIMEOUT).empty()) {
|
||||
try {
|
||||
|
||||
@ -1575,5 +1575,76 @@ suite("test_stream_load", "p0") {
|
||||
} finally {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName19} FORCE"""
|
||||
}
|
||||
|
||||
def sql_result = sql """ select Host, HttpPort from backends() where alive = true limit 1; """
|
||||
|
||||
log.info(sql_result[0][0].toString())
|
||||
log.info(sql_result[0][1].toString())
|
||||
log.info(sql_result[0].size.toString())
|
||||
|
||||
def beHost=sql_result[0][0]
|
||||
def beHttpPort=sql_result[0][1]
|
||||
log.info("${beHost}".toString())
|
||||
log.info("${beHttpPort}".toString());
|
||||
|
||||
//test be : chunked transfer + Content Length
|
||||
try {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName16} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName16} (
|
||||
`k1` bigint(20) NULL DEFAULT "1",
|
||||
`k2` bigint(20) NULL ,
|
||||
`v1` tinyint(4) NULL,
|
||||
`v2` tinyint(4) NULL,
|
||||
`v3` tinyint(4) NULL,
|
||||
`v4` DATETIME NULL
|
||||
) ENGINE=OLAP
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
|
||||
"""
|
||||
|
||||
def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H ${db}:${tableName16} -H Content-Length:0 -H Transfer-Encoding:chunked -H columns:k1,k2,v1,v2,v3 -T ${context.dataPath}/test_chunked_transfer.csv http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load"
|
||||
log.info("test chunked transfer command: ${command}")
|
||||
def process = command.execute()
|
||||
code = process.waitFor()
|
||||
out = process.text
|
||||
log.info("test chunked transfer result: ${out}".toString())
|
||||
def json = parseJson(out)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertTrue(json.Message.contains("[INTERNAL_ERROR]please do not set both content_length and transfer-encoding"))
|
||||
} finally {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
|
||||
}
|
||||
|
||||
|
||||
//test be : no chunked transfer + no Content Length
|
||||
try {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName16} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName16} (
|
||||
`k1` bigint(20) NULL DEFAULT "1",
|
||||
`k2` bigint(20) NULL ,
|
||||
`v1` tinyint(4) NULL,
|
||||
`v2` tinyint(4) NULL,
|
||||
`v3` tinyint(4) NULL,
|
||||
`v4` DATETIME NULL
|
||||
) ENGINE=OLAP
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
|
||||
"""
|
||||
|
||||
def command = "curl --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword} -H column_separator:| -H ${db}:${tableName16} -H Content-Length: -H Transfer-Encoding: -T ${context.dataPath}/test_chunked_transfer.csv http://${beHost}:${beHttpPort}/api/${db}/${tableName16}/_stream_load"
|
||||
log.info("test chunked transfer command: ${command}")
|
||||
def process = command.execute()
|
||||
code = process.waitFor()
|
||||
out = process.text
|
||||
log.info("test chunked transfer result: ${out}".toString())
|
||||
def json = parseJson(out)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertTrue(json.Message.contains("[INTERNAL_ERROR]content_length is empty and transfer-encoding!=chunked, please set content_length or transfer-encoding=chunked"))
|
||||
} finally {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName16} FORCE"""
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user