diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index ed0f170656..60e7c57a6c 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -25,7 +25,6 @@ #include #include "common/logging.h" -#include "io/fs/kafka_consumer_pipe.h" #include "librdkafka/rdkafkacpp.h" #include "runtime/routine_load/data_consumer.h" #include "runtime/stream_load/stream_load_context.h" @@ -72,7 +71,8 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() { DCHECK(_queue.get_size() == 0); } -Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx) { +Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, + std::shared_ptr kafka_pipe) { Status result_st = Status::OK(); // start all consumers for (auto& consumer : _consumers) { @@ -105,9 +105,6 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx) int64_t left_rows = ctx->max_batch_rows; int64_t left_bytes = ctx->max_batch_size; - std::shared_ptr kafka_pipe = - std::static_pointer_cast(ctx->body_sink); - LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " << ctx->brief(); diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index e7be39f5a6..e15ad7115f 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -25,6 +25,7 @@ #include #include "common/status.h" +#include "io/fs/kafka_consumer_pipe.h" #include "runtime/routine_load/data_consumer.h" #include "util/blocking_queue.hpp" #include "util/uid_util.h" @@ -60,7 +61,10 @@ public: } // start all consumers - virtual Status start_all(std::shared_ptr ctx) { return Status::OK(); } + virtual Status start_all(std::shared_ptr ctx, + std::shared_ptr kafka_pipe) { + return Status::OK(); + } protected: UniqueId _grp_id; @@ -82,7 +86,8 @@ public: virtual ~KafkaDataConsumerGroup(); - Status start_all(std::shared_ptr ctx) override; + Status start_all(std::shared_ptr ctx, + std::shared_ptr kafka_pipe) override; // assign topic partitions to all consumers equally Status assign_topic_partitions(std::shared_ptr ctx); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 49958400e7..445e78e06f 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -336,8 +336,11 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, #endif } + std::shared_ptr kafka_pipe = + std::static_pointer_cast(ctx->body_sink); + // start to consume, this may block a while - HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed"); + HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed"); if (ctx->is_multi_table) { // plan the rest of unplanned data @@ -346,6 +349,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, "multi tables task executes plan error"); // need memory order multi_table_pipe->set_consume_finished(); + HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed"); } // wait for all consumers finished