[enhancement](streamload) make stream load context as shared ptr and save it in global load mgr (#16996)
This commit is contained in:
@ -36,7 +36,7 @@ namespace doris {
|
||||
static const std::string PROP_GROUP_ID = "group.id";
|
||||
// init kafka consumer will only set common configs such as
|
||||
// brokers, groupid
|
||||
Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (_init) {
|
||||
// this consumer has already been initialized.
|
||||
@ -139,7 +139,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
|
||||
|
||||
Status KafkaDataConsumer::assign_topic_partitions(
|
||||
const std::map<int32_t, int64_t>& begin_partition_offset, const std::string& topic,
|
||||
StreamLoadContext* ctx) {
|
||||
std::shared_ptr<StreamLoadContext> ctx) {
|
||||
DCHECK(_k_consumer);
|
||||
// create TopicPartitions
|
||||
std::stringstream ss;
|
||||
@ -380,7 +380,7 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
|
||||
Status KafkaDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (!_init) {
|
||||
return Status::InternalError("consumer is not initialized");
|
||||
@ -413,7 +413,7 @@ Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset)
|
||||
|
||||
// if the kafka brokers and topic are same,
|
||||
// we considered this consumer as matched, thus can be reused.
|
||||
bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
|
||||
bool KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
|
||||
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user