From 538df287372f42b69a01bf44e765477e55c5409b Mon Sep 17 00:00:00 2001 From: caiconghui <55968745+caiconghui@users.noreply.github.com> Date: Thu, 3 Mar 2022 22:35:50 +0800 Subject: [PATCH] [improvement](routine-load) Support routine load task succeed with empty data consumed (#8256) --- .../routine_load/data_consumer_group.cpp | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 9096e15e0e..5f6c789f76 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -146,19 +146,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { return result_st; } - if (left_bytes == ctx->max_batch_size) { - // nothing to be consumed, we have to cancel it, because - // we do not allow finishing stream load pipe without data - kafka_pipe->cancel("no data"); - return Status::Cancelled("Cancelled"); - } else { - DCHECK(left_bytes < ctx->max_batch_size); - DCHECK(left_rows < ctx->max_batch_rows); - kafka_pipe->finish(); - ctx->kafka_info->cmt_offset = std::move(cmt_offset); - ctx->receive_bytes = ctx->max_batch_size - left_bytes; - return Status::OK(); - } + kafka_pipe->finish(); + ctx->kafka_info->cmt_offset = std::move(cmt_offset); + ctx->receive_bytes = ctx->max_batch_size - left_bytes; + return Status::OK(); } RdKafka::Message* msg;