diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 272ef928a4..8e6bc204fe 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -340,15 +340,19 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { - size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { - content_length *= 3; + // FIXME find a way to avoid chunked stream load write large WALs + size_t content_length = 0; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } } ctx->put_result.params.__set_content_length(content_length); } @@ -392,11 +396,19 @@ Status HttpStreamAction::_handle_group_commit(HttpRequest* req, LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } - if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) { + // allow chunked stream load in flink + auto is_chunk = + !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && + req->header(HttpHeaders::TRANSFER_ENCODING).find("chunked") != std::string::npos; + if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || + (content_length == 0 && !is_chunk)) { // off_mode and empty ctx->group_commit = false; return Status::OK(); } + if (is_chunk) { + ctx->label = ""; + } auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 89be3ae373..a3c7554ea1 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -646,15 +646,19 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return plan_status; } if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { - size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); - if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || - ctx->format == TFileFormatType::FORMAT_CSV_LZO || - ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || - ctx->format == TFileFormatType::FORMAT_CSV_LZOP || - ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || - ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { - content_length *= 3; + // FIXME find a way to avoid chunked stream load write large WALs + size_t content_length = 0; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } } ctx->put_result.params.__set_content_length(content_length); } @@ -721,11 +725,18 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } - if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || content_length == 0) { + // allow chunked stream load in flink + auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && + req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos; + if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || + (content_length == 0 && !is_chunk)) { // off_mode and empty ctx->group_commit = false; return Status::OK(); } + if (is_chunk) { + ctx->label = ""; + } auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");