From 03a4fe6f39d86419b05697db2d45857fc9f5de89 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Fri, 24 Feb 2023 11:15:29 +0800 Subject: [PATCH] [enhancement](streamload) make stream load context as shared ptr and save it in global load mgr (#16996) --- be/src/http/action/stream_load.cpp | 42 +++++----- be/src/http/action/stream_load.h | 10 +-- be/src/http/action/stream_load_2pc.cpp | 20 +---- be/src/http/action/stream_load_2pc.h | 1 - be/src/http/http_handler.h | 4 +- be/src/http/http_request.h | 7 +- be/src/io/file_factory.cpp | 6 +- be/src/runtime/fragment_mgr.cpp | 7 +- be/src/runtime/routine_load/data_consumer.cpp | 8 +- be/src/runtime/routine_load/data_consumer.h | 27 ++++--- .../routine_load/data_consumer_group.cpp | 4 +- .../routine_load/data_consumer_group.h | 6 +- .../routine_load/data_consumer_pool.cpp | 5 +- .../runtime/routine_load/data_consumer_pool.h | 5 +- .../routine_load_task_executor.cpp | 78 ++++++++----------- .../routine_load/routine_load_task_executor.h | 16 ++-- .../runtime/stream_load/new_load_stream_mgr.h | 15 ++-- .../runtime/stream_load/stream_load_context.h | 12 +-- .../stream_load/stream_load_executor.cpp | 13 +--- .../stream_load/stream_load_executor.h | 2 +- 20 files changed, 130 insertions(+), 158 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index a55c0e22c2..c0fc86b68b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -144,7 +144,8 @@ StreamLoadAction::~StreamLoadAction() { } void StreamLoadAction::handle(HttpRequest* req) { - StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); + std::shared_ptr ctx = + std::static_pointer_cast(req->handler_ctx()); if (ctx == nullptr) { return; } @@ -161,7 +162,7 @@ void StreamLoadAction::handle(HttpRequest* req) { if (!ctx->status.ok() && !ctx->status.is()) { if (ctx->need_rollback) { - _exec_env->stream_load_executor()->rollback_txn(ctx); + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } if (ctx->body_sink.get() != nullptr) { @@ -185,7 +186,7 @@ void StreamLoadAction::handle(HttpRequest* req) { streaming_load_current_processing->increment(-1); } -Status StreamLoadAction::_handle(StreamLoadContext* ctx) { +Status StreamLoadAction::_handle(std::shared_ptr ctx) { if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; @@ -206,12 +207,12 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { if (ctx->two_phase_commit) { int64_t pre_commit_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx)); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; } else { // If put file success we need commit this load int64_t commit_and_publish_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; } return Status::OK(); @@ -220,8 +221,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { int StreamLoadAction::on_header(HttpRequest* req) { streaming_load_current_processing->increment(1); - StreamLoadContext* ctx = new StreamLoadContext(_exec_env); - ctx->ref(); + std::shared_ptr ctx = std::make_shared(_exec_env); req->set_handler_ctx(ctx); ctx->load_type = TLoadType::MANUL_LOAD; @@ -243,7 +243,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { if (!st.ok()) { ctx->status = std::move(st); if (ctx->need_rollback) { - _exec_env->stream_load_executor()->rollback_txn(ctx); + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } if (ctx->body_sink.get() != nullptr) { @@ -265,7 +265,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { return 0; } -Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ctx) { +Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr ctx) { // auth information if (!parse_basic_auth(*http_req, &ctx->auth)) { LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); @@ -334,7 +334,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct // begin transaction int64_t begin_txn_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx)); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get())); ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; // process put file @@ -342,7 +342,8 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct } void StreamLoadAction::on_chunk_data(HttpRequest* req) { - StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); + std::shared_ptr ctx = + std::static_pointer_cast(req->handler_ctx()); if (ctx == nullptr || !ctx->status.ok()) { return; } @@ -367,8 +368,8 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); } -void StreamLoadAction::free_handler_ctx(void* param) { - StreamLoadContext* ctx = (StreamLoadContext*)param; +void StreamLoadAction::free_handler_ctx(std::shared_ptr param) { + std::shared_ptr ctx = std::static_pointer_cast(param); if (ctx == nullptr) { return; } @@ -376,12 +377,12 @@ void StreamLoadAction::free_handler_ctx(void* param) { if (ctx->body_sink != nullptr) { ctx->body_sink->cancel("sender is gone"); } - if (ctx->unref()) { - delete ctx; - } + // remove stream load context from stream load manager and the resource will be released + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); } -Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* ctx) { +Status StreamLoadAction::_process_put(HttpRequest* http_req, + std::shared_ptr ctx) { // Now we use stream ctx->use_streaming = is_format_support_streaming(ctx->format); @@ -399,10 +400,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* auto pipe = std::make_shared( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, ctx->body_bytes /* total_length */); - RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe)); - request.fileType = TFileType::FILE_STREAM; ctx->body_sink = pipe; + ctx->pipe = pipe; + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); } else { RETURN_IF_ERROR(_data_saved_path(http_req, &request.path)); auto file_sink = std::make_shared(request.path); @@ -621,7 +622,8 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa return Status::OK(); } -void StreamLoadAction::_save_stream_load_record(StreamLoadContext* ctx, const std::string& str) { +void StreamLoadAction::_save_stream_load_record(std::shared_ptr ctx, + const std::string& str) { auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); if (stream_load_recorder != nullptr) { std::string key = diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index c7b7a7d142..2dc3c762ad 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -42,14 +42,14 @@ public: int on_header(HttpRequest* req) override; void on_chunk_data(HttpRequest* req) override; - void free_handler_ctx(void* ctx) override; + void free_handler_ctx(std::shared_ptr ctx) override; private: - Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx); - Status _handle(StreamLoadContext* ctx); + Status _on_header(HttpRequest* http_req, std::shared_ptr ctx); + Status _handle(std::shared_ptr ctx); Status _data_saved_path(HttpRequest* req, std::string* file_path); - Status _process_put(HttpRequest* http_req, StreamLoadContext* ctx); - void _save_stream_load_record(StreamLoadContext* ctx, const std::string& str); + Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); + void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); private: ExecEnv* _exec_env; diff --git a/be/src/http/action/stream_load_2pc.cpp b/be/src/http/action/stream_load_2pc.cpp index bcc4d9f288..3a5690cb8f 100644 --- a/be/src/http/action/stream_load_2pc.cpp +++ b/be/src/http/action/stream_load_2pc.cpp @@ -40,9 +40,7 @@ void StreamLoad2PCAction::handle(HttpRequest* req) { Status status = Status::OK(); std::string status_result; - StreamLoadContext* ctx = new StreamLoadContext(_exec_env); - ctx->ref(); - req->set_handler_ctx(ctx); + std::shared_ptr ctx = std::make_shared(_exec_env); ctx->db = req->param(HTTP_DB_KEY); std::string req_txn_id = req->header(HTTP_TXN_ID_KEY); try { @@ -66,7 +64,7 @@ void StreamLoad2PCAction::handle(HttpRequest* req) { status = Status::InternalError("no valid Basic authorization"); } - status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx); + status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx.get()); if (!status.ok()) { status_result = status.to_json(); @@ -93,18 +91,4 @@ std::string StreamLoad2PCAction::get_success_info(const std::string txn_id, return s.GetString(); } -void StreamLoad2PCAction::free_handler_ctx(void* param) { - StreamLoadContext* ctx = (StreamLoadContext*)param; - if (ctx == nullptr) { - return; - } - // sender is gone, make receiver know it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel("sender is gone"); - } - if (ctx->unref()) { - delete ctx; - } -} - } // namespace doris diff --git a/be/src/http/action/stream_load_2pc.h b/be/src/http/action/stream_load_2pc.h index 960850e20e..1a1013a12b 100644 --- a/be/src/http/action/stream_load_2pc.h +++ b/be/src/http/action/stream_load_2pc.h @@ -33,7 +33,6 @@ public: void handle(HttpRequest* req) override; std::string get_success_info(const std::string txn_id, const std::string txn_operation); - void free_handler_ctx(void* param) override; private: ExecEnv* _exec_env; diff --git a/be/src/http/http_handler.h b/be/src/http/http_handler.h index c2c108f1a0..f8058fb8a6 100644 --- a/be/src/http/http_handler.h +++ b/be/src/http/http_handler.h @@ -17,6 +17,8 @@ #pragma once +#include + namespace doris { class HttpRequest; @@ -37,7 +39,7 @@ public: virtual int on_header(HttpRequest* req) { return 0; } virtual void on_chunk_data(HttpRequest* req) {} - virtual void free_handler_ctx(void* handler_ctx) {} + virtual void free_handler_ctx(std::shared_ptr handler_ctx) {} }; } // namespace doris diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index 1503e4303a..a26be3a22c 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -21,6 +21,7 @@ #include #include +#include #include #include "http/http_common.h" @@ -74,8 +75,8 @@ public: struct evhttp_request* get_evhttp_request() const { return _ev_req; } - void* handler_ctx() const { return _handler_ctx; } - void set_handler_ctx(void* ctx) { + std::shared_ptr handler_ctx() const { return _handler_ctx; } + void set_handler_ctx(std::shared_ptr ctx) { DCHECK(_handler != nullptr); _handler_ctx = ctx; } @@ -94,7 +95,7 @@ private: struct evhttp_request* _ev_req = nullptr; HttpHandler* _handler = nullptr; - void* _handler_ctx = nullptr; + std::shared_ptr _handler_ctx; std::string _request_body; }; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 9324a83f27..3cb865be40 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -38,6 +38,7 @@ #include "olap/iterators.h" #include "runtime/exec_env.h" #include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" namespace doris { @@ -190,10 +191,11 @@ Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/, // file scan node/stream load pipe Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader) { - *file_reader = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); - if (!(*file_reader)) { + auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id); + if (!stream_load_ctx) { return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } + *file_reader = stream_load_ctx->pipe; return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f2a2c9e798..02644f186d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -516,7 +516,8 @@ void FragmentMgr::_exec_actual(std::shared_ptr exec_state, Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { if (params.txn_conf.need_txn) { - StreamLoadContext* stream_load_ctx = new StreamLoadContext(_exec_env); + std::shared_ptr stream_load_ctx = + std::make_shared(_exec_env); stream_load_ctx->db = params.txn_conf.db; stream_load_ctx->db_id = params.txn_conf.db_id; stream_load_ctx->table = params.txn_conf.tbl; @@ -536,9 +537,11 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, -1 /* total_length */, true /* use_proto */); stream_load_ctx->body_sink = pipe; + stream_load_ctx->pipe = pipe; stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; - RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, pipe)); + RETURN_IF_ERROR( + _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx)); RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); set_pipe(params.params.fragment_instance_id, pipe, diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index f2e3464f47..88c00044e1 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -36,7 +36,7 @@ namespace doris { static const std::string PROP_GROUP_ID = "group.id"; // init kafka consumer will only set common configs such as // brokers, groupid -Status KafkaDataConsumer::init(StreamLoadContext* ctx) { +Status KafkaDataConsumer::init(std::shared_ptr ctx) { std::unique_lock l(_lock); if (_init) { // this consumer has already been initialized. @@ -139,7 +139,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { Status KafkaDataConsumer::assign_topic_partitions( const std::map& begin_partition_offset, const std::string& topic, - StreamLoadContext* ctx) { + std::shared_ptr ctx) { DCHECK(_k_consumer); // create TopicPartitions std::stringstream ss; @@ -380,7 +380,7 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions( return Status::OK(); } -Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) { +Status KafkaDataConsumer::cancel(std::shared_ptr ctx) { std::unique_lock l(_lock); if (!_init) { return Status::InternalError("consumer is not initialized"); @@ -413,7 +413,7 @@ Status KafkaDataConsumer::commit(std::vector& offset) // if the kafka brokers and topic are same, // we considered this consumer as matched, thus can be reused. -bool KafkaDataConsumer::match(StreamLoadContext* ctx) { +bool KafkaDataConsumer::match(std::shared_ptr ctx) { if (ctx->load_src_type != TLoadSourceType::KAFKA) { return false; } diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index afd4d9f6f7..4ad3d6821f 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -33,7 +33,7 @@ class Status; class DataConsumer { public: - DataConsumer(StreamLoadContext* ctx) + DataConsumer() : _id(UniqueId::gen_uid()), _grp_id(UniqueId::gen_uid()), _has_grp(false), @@ -44,18 +44,18 @@ public: virtual ~DataConsumer() {} // init the consumer with the given parameters - virtual Status init(StreamLoadContext* ctx) = 0; + virtual Status init(std::shared_ptr ctx) = 0; // start consuming - virtual Status consume(StreamLoadContext* ctx) = 0; + virtual Status consume(std::shared_ptr ctx) = 0; // cancel the consuming process. // if the consumer is not initialized, or the consuming // process is already finished, call cancel() will // return ERROR - virtual Status cancel(StreamLoadContext* ctx) = 0; + virtual Status cancel(std::shared_ptr ctx) = 0; // reset the data consumer before being reused virtual Status reset() = 0; // return true the if the consumer match the need - virtual bool match(StreamLoadContext* ctx) = 0; + virtual bool match(std::shared_ptr ctx) = 0; const UniqueId& id() { return _id; } time_t last_visit_time() { return _last_visit_time; } @@ -109,10 +109,8 @@ public: class KafkaDataConsumer : public DataConsumer { public: - KafkaDataConsumer(StreamLoadContext* ctx) - : DataConsumer(ctx), - _brokers(ctx->kafka_info->brokers), - _topic(ctx->kafka_info->topic) {} + KafkaDataConsumer(std::shared_ptr ctx) + : _brokers(ctx->kafka_info->brokers), _topic(ctx->kafka_info->topic) {} virtual ~KafkaDataConsumer() { VLOG_NOTICE << "deconstruct consumer"; @@ -123,18 +121,19 @@ public: } } - virtual Status init(StreamLoadContext* ctx) override; + Status init(std::shared_ptr ctx) override; // TODO(cmy): currently do not implement single consumer start method, using group_consume - virtual Status consume(StreamLoadContext* ctx) override { return Status::OK(); } - virtual Status cancel(StreamLoadContext* ctx) override; + Status consume(std::shared_ptr ctx) override { return Status::OK(); } + Status cancel(std::shared_ptr ctx) override; // reassign partition topics virtual Status reset() override; - virtual bool match(StreamLoadContext* ctx) override; + bool match(std::shared_ptr ctx) override; // commit kafka offset Status commit(std::vector& offset); Status assign_topic_partitions(const std::map& begin_partition_offset, - const std::string& topic, StreamLoadContext* ctx); + const std::string& topic, + std::shared_ptr ctx); // start the consumer and put msgs to queue Status group_consume(BlockingQueue* queue, int64_t max_running_time_ms); diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 869d427568..2d22fa93ab 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -24,7 +24,7 @@ namespace doris { -Status KafkaDataConsumerGroup::assign_topic_partitions(StreamLoadContext* ctx) { +Status KafkaDataConsumerGroup::assign_topic_partitions(std::shared_ptr ctx) { DCHECK(ctx->kafka_info); DCHECK(_consumers.size() >= 1); @@ -63,7 +63,7 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() { DCHECK(_queue.get_size() == 0); } -Status KafkaDataConsumerGroup::start_all(StreamLoadContext* ctx) { +Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx) { Status result_st = Status::OK(); // start all consumers for (auto& consumer : _consumers) { diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index b105a3c9db..bd866f0468 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -46,7 +46,7 @@ public: } // start all consumers - virtual Status start_all(StreamLoadContext* ctx) { return Status::OK(); } + virtual Status start_all(std::shared_ptr ctx) { return Status::OK(); } protected: UniqueId _grp_id; @@ -68,9 +68,9 @@ public: virtual ~KafkaDataConsumerGroup(); - virtual Status start_all(StreamLoadContext* ctx) override; + Status start_all(std::shared_ptr ctx) override; // assign topic partitions to all consumers equally - Status assign_topic_partitions(StreamLoadContext* ctx); + Status assign_topic_partitions(std::shared_ptr ctx); private: // start a single consumer diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 31f666d3ab..579edfd201 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -22,7 +22,8 @@ namespace doris { -Status DataConsumerPool::get_consumer(StreamLoadContext* ctx, std::shared_ptr* ret) { +Status DataConsumerPool::get_consumer(std::shared_ptr ctx, + std::shared_ptr* ret) { std::unique_lock l(_lock); // check if there is an available consumer. @@ -58,7 +59,7 @@ Status DataConsumerPool::get_consumer(StreamLoadContext* ctx, std::shared_ptr ctx, std::shared_ptr* ret) { if (ctx->load_src_type != TLoadSourceType::KAFKA) { return Status::InternalError( diff --git a/be/src/runtime/routine_load/data_consumer_pool.h b/be/src/runtime/routine_load/data_consumer_pool.h index 5ea39d0a6c..6f8cf79f74 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.h +++ b/be/src/runtime/routine_load/data_consumer_pool.h @@ -49,10 +49,11 @@ public: // get a already initialized consumer from cache, // if not found in cache, create a new one. - Status get_consumer(StreamLoadContext* ctx, std::shared_ptr* ret); + Status get_consumer(std::shared_ptr ctx, std::shared_ptr* ret); // get several consumers and put them into group - Status get_consumer_grp(StreamLoadContext* ctx, std::shared_ptr* ret); + Status get_consumer_grp(std::shared_ptr ctx, + std::shared_ptr* ret); // return the consumer to the pool void return_consumer(std::shared_ptr consumer); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index da8842ac59..86f56ee756 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -55,19 +55,13 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() { _thread_pool.join(); LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup"; - for (auto it = _task_map.begin(); it != _task_map.end(); ++it) { - auto ctx = it->second; - if (ctx->unref()) { - delete ctx; - } - } _task_map.clear(); } // Create a temp StreamLoadContext and set some kafka connection info in it. // So that we can use this ctx to get kafka data consumer instance. Status RoutineLoadTaskExecutor::_prepare_ctx(const PKafkaMetaProxyRequest& request, - StreamLoadContext* ctx) { + std::shared_ptr ctx) { ctx->load_type = TLoadType::ROUTINE_LOAD; ctx->load_src_type = TLoadSourceType::KAFKA; ctx->label = "NaN"; @@ -93,11 +87,11 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe CHECK(request.has_kafka_info()); // This context is meaningless, just for unifing the interface - StreamLoadContext ctx(_exec_env); - RETURN_IF_ERROR(_prepare_ctx(request, &ctx)); + std::shared_ptr ctx = std::make_shared(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, ctx)); std::shared_ptr consumer; - RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer)); + RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer)); Status st = std::static_pointer_cast(consumer)->get_partition_meta( partition_ids); @@ -112,11 +106,11 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times( CHECK(request.has_kafka_info()); // This context is meaningless, just for unifing the interface - StreamLoadContext ctx(_exec_env); - RETURN_IF_ERROR(_prepare_ctx(request, &ctx)); + std::shared_ptr ctx = std::make_shared(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, ctx)); std::shared_ptr consumer; - RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer)); + RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer)); Status st = std::static_pointer_cast(consumer)->get_offsets_for_times( std::vector(request.offset_times().begin(), request.offset_times().end()), @@ -132,11 +126,11 @@ Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions( CHECK(request.has_kafka_info()); // This context is meaningless, just for unifing the interface - StreamLoadContext ctx(_exec_env); - RETURN_IF_ERROR(_prepare_ctx(request, &ctx)); + std::shared_ptr ctx = std::make_shared(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, ctx)); std::shared_ptr consumer; - RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer)); + RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer)); Status st = std::static_pointer_cast(consumer) @@ -168,7 +162,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { } // create the context - StreamLoadContext* ctx = new StreamLoadContext(_exec_env); + std::shared_ptr ctx = std::make_shared(_exec_env); ctx->load_type = TLoadType::ROUTINE_LOAD; ctx->load_src_type = task.type; ctx->job_id = task.job_id; @@ -212,35 +206,28 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { break; default: LOG(WARNING) << "unknown load source type: " << task.type; - delete ctx; return Status::InternalError("unknown load source type"); } VLOG_CRITICAL << "receive a new routine load task: " << ctx->brief(); // register the task - ctx->ref(); _task_map[ctx->id] = ctx; // offer the task to thread pool - if (!_thread_pool.offer(std::bind(&RoutineLoadTaskExecutor::exec_task, this, ctx, - &_data_consumer_pool, [this](StreamLoadContext* ctx) { - std::unique_lock l(_lock); - _task_map.erase(ctx->id); - LOG(INFO) << "finished routine load task " - << ctx->brief() - << ", status: " << ctx->status - << ", current tasks num: " - << _task_map.size(); - if (ctx->unref()) { - delete ctx; - } - }))) { + if (!_thread_pool.offer(std::bind( + &RoutineLoadTaskExecutor::exec_task, this, ctx, &_data_consumer_pool, + [this](std::shared_ptr ctx) { + std::unique_lock l(_lock); + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); + _task_map.erase(ctx->id); + LOG(INFO) << "finished routine load task " << ctx->brief() + << ", status: " << ctx->status + << ", current tasks num: " << _task_map.size(); + }))) { // failed to submit task, clear and return LOG(WARNING) << "failed to submit routine load task: " << ctx->brief(); + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); _task_map.erase(ctx->id); - if (ctx->unref()) { - delete ctx; - } return Status::InternalError("failed to submit routine load task"); } else { LOG(INFO) << "submit a new routine load task: " << ctx->brief() @@ -249,8 +236,8 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { } } -void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool* consumer_pool, - ExecFinishCallback cb) { +void RoutineLoadTaskExecutor::exec_task(std::shared_ptr ctx, + DataConsumerPool* consumer_pool, ExecFinishCallback cb) { #define HANDLE_ERROR(stmt, err_msg) \ do { \ Status _status_ = (stmt); \ @@ -292,7 +279,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool ctx->body_sink = pipe; // must put pipe before executing plan fragment - HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe), "failed to add pipe"); + HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed to add pipe"); #ifndef BE_TEST // execute plan fragment, async @@ -316,7 +303,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool consumer_pool->return_consumers(consumer_grp.get()); // commit txn - HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx), "commit failed"); + HANDLE_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()), "commit failed"); // commit kafka offset switch (ctx->load_src_type) { @@ -358,12 +345,12 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool cb(ctx); } -void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status& st, +void RoutineLoadTaskExecutor::err_handler(std::shared_ptr ctx, const Status& st, const std::string& err_msg) { LOG(WARNING) << err_msg << ", routine load task: " << ctx->brief(true); ctx->status = st; if (ctx->need_rollback) { - _exec_env->stream_load_executor()->rollback_txn(ctx); + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); ctx->need_rollback = false; } if (ctx->body_sink != nullptr) { @@ -372,10 +359,10 @@ void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status& } // for test only -Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) { +Status RoutineLoadTaskExecutor::_execute_plan_for_test(std::shared_ptr ctx) { auto mock_consumer = [this, ctx]() { - ctx->ref(); - std::shared_ptr pipe = _exec_env->new_load_stream_mgr()->get(ctx->id); + std::shared_ptr pipe = std::static_pointer_cast( + _exec_env->new_load_stream_mgr()->get(ctx->id)->body_sink); std::stringstream ss; while (true) { char one; @@ -403,9 +390,6 @@ Status RoutineLoadTaskExecutor::_execute_plan_for_test(StreamLoadContext* ctx) { ss << one; } } - if (ctx->unref()) { - delete ctx; - } }; std::thread t1(mock_consumer); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 6c391a6888..8b46e7bd8d 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "gen_cpp/internal_service.pb.h" @@ -40,7 +41,7 @@ class TRoutineLoadTask; // to FE finally. class RoutineLoadTaskExecutor { public: - typedef std::function ExecFinishCallback; + using ExecFinishCallback = std::function)>; RoutineLoadTaskExecutor(ExecEnv* exec_env); @@ -60,14 +61,17 @@ public: private: // execute the task - void exec_task(StreamLoadContext* ctx, DataConsumerPool* pool, ExecFinishCallback cb); + void exec_task(std::shared_ptr ctx, DataConsumerPool* pool, + ExecFinishCallback cb); - void err_handler(StreamLoadContext* ctx, const Status& st, const std::string& err_msg); + void err_handler(std::shared_ptr ctx, const Status& st, + const std::string& err_msg); // for test only - Status _execute_plan_for_test(StreamLoadContext* ctx); + Status _execute_plan_for_test(std::shared_ptr ctx); // create a dummy StreamLoadContext for PKafkaMetaProxyRequest - Status _prepare_ctx(const PKafkaMetaProxyRequest& request, StreamLoadContext* ctx); + Status _prepare_ctx(const PKafkaMetaProxyRequest& request, + std::shared_ptr ctx); private: ExecEnv* _exec_env; @@ -76,7 +80,7 @@ private: std::mutex _lock; // task id -> load context - std::unordered_map _task_map; + std::unordered_map> _task_map; }; } // namespace doris diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h b/be/src/runtime/stream_load/new_load_stream_mgr.h index 9ab2030487..bb245232dc 100644 --- a/be/src/runtime/stream_load/new_load_stream_mgr.h +++ b/be/src/runtime/stream_load/new_load_stream_mgr.h @@ -21,19 +21,20 @@ #include #include -#include "io/fs/stream_load_pipe.h" +#include "common/status.h" #include "util/doris_metrics.h" #include "util/uid_util.h" namespace doris { +class StreamLoadContext; // used to register all streams in process so that other module can get this stream class NewLoadStreamMgr { public: NewLoadStreamMgr(); ~NewLoadStreamMgr(); - Status put(const UniqueId& id, std::shared_ptr stream) { + Status put(const UniqueId& id, std::shared_ptr stream) { std::lock_guard l(_lock); auto it = _stream_map.find(id); if (it != std::end(_stream_map)) { @@ -44,15 +45,13 @@ public: return Status::OK(); } - std::shared_ptr get(const UniqueId& id) { + std::shared_ptr get(const UniqueId& id) { std::lock_guard l(_lock); auto it = _stream_map.find(id); if (it == std::end(_stream_map)) { - return nullptr; + return std::shared_ptr(nullptr); } - auto stream = it->second; - _stream_map.erase(it); - return stream; + return it->second; } void remove(const UniqueId& id) { @@ -66,6 +65,6 @@ public: private: std::mutex _lock; - std::unordered_map> _stream_map; + std::unordered_map> _stream_map; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index ef3602ab8f..8951630afb 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -27,7 +27,9 @@ #include "common/utils.h" #include "gen_cpp/BackendService_types.h" #include "gen_cpp/FrontendService_types.h" +#include "io/fs/stream_load_pipe.h" #include "runtime/exec_env.h" +#include "runtime/message_body_sink.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "service/backend_options.h" @@ -82,7 +84,7 @@ class MessageBodySink; class StreamLoadContext { public: - StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env), _refs(0) { + StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env) { start_millis = UnixMillis(); } @@ -91,8 +93,6 @@ public: _exec_env->stream_load_executor()->rollback_txn(this); need_rollback = false; } - - _exec_env->new_load_stream_mgr()->remove(id); } std::string to_json() const; @@ -109,10 +109,6 @@ public: // also print the load source info if detail is set to true std::string brief(bool detail = false) const; - void ref() { _refs.fetch_add(1); } - // If unref() returns true, this object should be delete - bool unref() { return _refs.fetch_sub(1) == 1; } - public: // load type, eg: ROUTINE LOAD/MANUAL LOAD TLoadType::type load_type; @@ -164,6 +160,7 @@ public: TFileCompressType::type compress_type = TFileCompressType::UNKNOWN; std::shared_ptr body_sink; + std::shared_ptr pipe; TStreamLoadPutResult put_result; @@ -211,7 +208,6 @@ public: private: ExecEnv* _exec_env; - std::atomic _refs; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 61244bbd9f..e9d6c105f9 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -39,15 +39,15 @@ TLoadTxnRollbackResult k_stream_load_rollback_result; Status k_stream_load_plan_status; #endif -Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { +Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr ctx) { // submit this params #ifndef BE_TEST - ctx->ref(); ctx->start_write_data_nanos = MonotonicNanos(); LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); auto st = _exec_env->fragment_mgr()->exec_plan_fragment( ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); ctx->commit_infos = std::move(state->tablet_commit_infos()); if (status->ok()) { ctx->number_total_rows = state->num_rows_load_total(); @@ -110,19 +110,14 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { if (ctx->need_commit_self && ctx->body_sink != nullptr) { if (ctx->body_sink->cancelled() || !status->ok()) { ctx->status = *status; - this->rollback_txn(ctx); + this->rollback_txn(ctx.get()); } else { - this->commit_txn(ctx); + this->commit_txn(ctx.get()); } } - - if (ctx->unref()) { - delete ctx; - } }); if (!st.ok()) { // no need to check unref's return value - ctx->unref(); return st; } #else diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 2c464dea75..b90cef0a06 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -44,7 +44,7 @@ public: void rollback_txn(StreamLoadContext* ctx); - Status execute_plan_fragment(StreamLoadContext* ctx); + Status execute_plan_fragment(std::shared_ptr ctx); private: // collect the load statistics from context and set them to stat