diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5b6e393fee..deb99682fe 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -744,8 +744,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); DEFINE_Bool(share_delta_writers, "true"); // number of brpc stream per load DEFINE_Int32(num_streams_per_load, "5"); -// timeout for open stream sink rpc in ms -DEFINE_Int64(open_stream_sink_timeout_ms, "500"); +// timeout for open load stream rpc in ms +DEFINE_Int64(open_load_stream_timeout_ms, "500"); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, diff --git a/be/src/common/config.h b/be/src/common/config.h index 71c0a1e12c..9855ba0ffe 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -801,8 +801,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); DECLARE_Bool(share_delta_writers); // number of brpc stream per load DECLARE_Int32(num_streams_per_load); -// timeout for open stream sink rpc in ms -DECLARE_Int64(open_stream_sink_timeout_ms); +// timeout for open load stream rpc in ms +DECLARE_Int64(open_load_stream_timeout_ms); // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 611e45edc1..8f9ed8c0d0 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -355,7 +355,7 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl } } -void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* controller, +void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* controller, const POpenStreamSinkRequest* request, POpenStreamSinkResponse* response, google::protobuf::Closure* done) { @@ -365,7 +365,7 @@ void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* con brpc::Controller* cntl = static_cast(controller); brpc::StreamOptions stream_options; - LOG(INFO) << "open stream sink, load_id = " << request->load_id() + LOG(INFO) << "open load stream, load_id = " << request->load_id() << ", src_id = " << request->src_id(); for (const auto& req : request->tablets()) { diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index db0ee07581..d5712e654c 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -92,7 +92,7 @@ public: PTabletWriterOpenResult* response, google::protobuf::Closure* done) override; - void open_stream_sink(google::protobuf::RpcController* controller, + void open_load_stream(google::protobuf::RpcController* controller, const POpenStreamSinkRequest* request, POpenStreamSinkResponse* response, google::protobuf::Closure* done) override; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 821c46dc43..60ef352ddb 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -101,7 +101,7 @@ LoadStreamStub::~LoadStreamStub() { } } -// open_stream_sink +// open_load_stream // tablets means Status LoadStreamStub::open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, @@ -124,7 +124,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, if (int ret = StreamCreate(&_stream_id, cntl, &opt)) { return Status::Error(ret, "Failed to create stream"); } - cntl.set_timeout_ms(config::open_stream_sink_timeout_ms); + cntl.set_timeout_ms(config::open_load_stream_timeout_ms); POpenStreamSinkRequest request; *request.mutable_load_id() = _load_id; request.set_src_id(_src_id); @@ -138,7 +138,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, // see: https://github.com/apache/brpc/issues/392 const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "pooled"); - stub->open_stream_sink(&cntl, &request, &response, nullptr); + stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); tablet_schema->init_from_pb(resp.tablet_schema()); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index bd3ddec0d6..60a4842761 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -145,7 +145,7 @@ public: #endif ~LoadStreamStub(); - // open_stream_sink + // open_load_stream Status open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, bool enable_profile); diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 043c9a10d7..c733d78722 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -347,7 +347,7 @@ public: StreamService(LoadStreamMgr* load_stream_mgr) : _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {} virtual ~StreamService() { brpc::StreamClose(_sd); }; - virtual void open_stream_sink(google::protobuf::RpcController* controller, + virtual void open_load_stream(google::protobuf::RpcController* controller, const POpenStreamSinkRequest* request, POpenStreamSinkResponse* response, google::protobuf::Closure* done) { @@ -453,11 +453,11 @@ public: auto ptablet = request.add_tablets(); ptablet->set_tablet_id(NORMAL_TABLET_ID); ptablet->set_index_id(NORMAL_INDEX_ID); - stub.open_stream_sink(&_cntl, &request, &response, nullptr); + stub.open_load_stream(&_cntl, &request, &response, nullptr); if (_cntl.Failed()) { - std::cerr << "open_stream_sink failed" << std::endl; - LOG(ERROR) << "Fail to open stream sink " << _cntl.ErrorText(); - return Status::InternalError("Fail to open stream sink"); + std::cerr << "open_load_stream failed" << std::endl; + LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText(); + return Status::InternalError("Fail to open load stream"); } return Status::OK(); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8f0dc34e9d..5881e5bf2a 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -796,7 +796,7 @@ service PBackendService { rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); - rpc open_stream_sink(POpenStreamSinkRequest) returns (POpenStreamSinkResponse); + rpc open_load_stream(POpenStreamSinkRequest) returns (POpenStreamSinkResponse); rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult); rpc tablet_writer_add_block_by_http(PEmptyRequest) returns (PTabletWriterAddBlockResult); rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);