[improvement](routine-load) Support routine load task succeed with empty data consumed (#8256)
This commit is contained in:
@ -146,19 +146,10 @@ Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) {
|
||||
return result_st;
|
||||
}
|
||||
|
||||
if (left_bytes == ctx->max_batch_size) {
|
||||
// nothing to be consumed, we have to cancel it, because
|
||||
// we do not allow finishing stream load pipe without data
|
||||
kafka_pipe->cancel("no data");
|
||||
return Status::Cancelled("Cancelled");
|
||||
} else {
|
||||
DCHECK(left_bytes < ctx->max_batch_size);
|
||||
DCHECK(left_rows < ctx->max_batch_rows);
|
||||
kafka_pipe->finish();
|
||||
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
|
||||
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
|
||||
return Status::OK();
|
||||
}
|
||||
kafka_pipe->finish();
|
||||
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
|
||||
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
RdKafka::Message* msg;
|
||||
|
||||
Reference in New Issue
Block a user