diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 9096e15e0e..5f6c789f76 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -146,19 +146,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { return result_st; } - if (left_bytes == ctx->max_batch_size) { - // nothing to be consumed, we have to cancel it, because - // we do not allow finishing stream load pipe without data - kafka_pipe->cancel("no data"); - return Status::Cancelled("Cancelled"); - } else { - DCHECK(left_bytes < ctx->max_batch_size); - DCHECK(left_rows < ctx->max_batch_rows); - kafka_pipe->finish(); - ctx->kafka_info->cmt_offset = std::move(cmt_offset); - ctx->receive_bytes = ctx->max_batch_size - left_bytes; - return Status::OK(); - } + kafka_pipe->finish(); + ctx->kafka_info->cmt_offset = std::move(cmt_offset); + ctx->receive_bytes = ctx->max_batch_size - left_bytes; + return Status::OK(); } RdKafka::Message* msg;