[fix](multi-table) fix single stream multi table load can not finish (#25379)

This commit is contained in:
HHoflittlefish777
2023-10-13 15:47:16 +08:00
committed by GitHub
parent 283bd59eba
commit 2ec53ff60e
3 changed files with 14 additions and 8 deletions

View File

@ -25,7 +25,6 @@
#include <utility>
#include "common/logging.h"
#include "io/fs/kafka_consumer_pipe.h"
#include "librdkafka/rdkafkacpp.h"
#include "runtime/routine_load/data_consumer.h"
#include "runtime/stream_load/stream_load_context.h"
@ -72,7 +71,8 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
DCHECK(_queue.get_size() == 0);
}
Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx) {
Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
Status result_st = Status::OK();
// start all consumers
for (auto& consumer : _consumers) {
@ -105,9 +105,6 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx)
int64_t left_rows = ctx->max_batch_rows;
int64_t left_bytes = ctx->max_batch_size;
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time
<< ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". "
<< ctx->brief();

View File

@ -25,6 +25,7 @@
#include <vector>
#include "common/status.h"
#include "io/fs/kafka_consumer_pipe.h"
#include "runtime/routine_load/data_consumer.h"
#include "util/blocking_queue.hpp"
#include "util/uid_util.h"
@ -60,7 +61,10 @@ public:
}
// start all consumers
virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx) { return Status::OK(); }
virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
return Status::OK();
}
protected:
UniqueId _grp_id;
@ -82,7 +86,8 @@ public:
virtual ~KafkaDataConsumerGroup();
Status start_all(std::shared_ptr<StreamLoadContext> ctx) override;
Status start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) override;
// assign topic partitions to all consumers equally
Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);

View File

@ -336,8 +336,11 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
#endif
}
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
// start to consume, this may block a while
HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed");
HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
if (ctx->is_multi_table) {
// plan the rest of unplanned data
@ -346,6 +349,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
"multi tables task executes plan error");
// need memory order
multi_table_pipe->set_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
}
// wait for all consumers finished