[improve](group commit) Group commit support chunked stream load in flink (#32135)
This commit is contained in:
@ -340,15 +340,19 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
|
||||
ctx->label = ctx->put_result.params.import_label;
|
||||
ctx->put_result.params.__set_wal_id(ctx->wal_id);
|
||||
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
|
||||
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
|
||||
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
|
||||
content_length *= 3;
|
||||
// FIXME find a way to avoid chunked stream load write large WALs
|
||||
size_t content_length = 0;
|
||||
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
|
||||
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
|
||||
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
|
||||
content_length *= 3;
|
||||
}
|
||||
}
|
||||
ctx->put_result.params.__set_content_length(content_length);
|
||||
}
|
||||
@ -392,11 +396,19 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) {
|
||||
// allow chunked stream load in flink
|
||||
auto is_chunk =
|
||||
!req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
|
||||
req->header(HttpHeaders::TRANSFER_ENCODING).find("chunked") != std::string::npos;
|
||||
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
|
||||
(content_length == 0 && !is_chunk)) {
|
||||
// off_mode and empty
|
||||
ctx->group_commit = false;
|
||||
return Status::OK();
|
||||
}
|
||||
if (is_chunk) {
|
||||
ctx->label = "";
|
||||
}
|
||||
|
||||
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
|
||||
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
|
||||
|
||||
@ -646,15 +646,19 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
|
||||
return plan_status;
|
||||
}
|
||||
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
|
||||
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
|
||||
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
|
||||
content_length *= 3;
|
||||
// FIXME find a way to avoid chunked stream load write large WALs
|
||||
size_t content_length = 0;
|
||||
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
|
||||
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
|
||||
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
|
||||
ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
|
||||
content_length *= 3;
|
||||
}
|
||||
}
|
||||
ctx->put_result.params.__set_content_length(content_length);
|
||||
}
|
||||
@ -721,11 +725,18 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
|
||||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) {
|
||||
// allow chunked stream load in flink
|
||||
auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
|
||||
req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos;
|
||||
if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
|
||||
(content_length == 0 && !is_chunk)) {
|
||||
// off_mode and empty
|
||||
ctx->group_commit = false;
|
||||
return Status::OK();
|
||||
}
|
||||
if (is_chunk) {
|
||||
ctx->label = "";
|
||||
}
|
||||
|
||||
auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
|
||||
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
|
||||
|
||||
Reference in New Issue
Block a user