[refactor](move-memtable) rename open_stream_sink rpc to open_load_stream (#25883)

This commit is contained in:
Kaijie Chen
2023-10-29 10:07:14 +08:00
committed by GitHub
parent bd2f007d52
commit 6a85f46ff3
8 changed files with 17 additions and 17 deletions

View File

@ -744,8 +744,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
DEFINE_Bool(share_delta_writers, "true");
// number of brpc stream per load
DEFINE_Int32(num_streams_per_load, "5");
// timeout for open stream sink rpc in ms
DEFINE_Int64(open_stream_sink_timeout_ms, "500");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "500");
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,

View File

@ -801,8 +801,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
DECLARE_Bool(share_delta_writers);
// number of brpc stream per load
DECLARE_Int32(num_streams_per_load);
// timeout for open stream sink rpc in ms
DECLARE_Int64(open_stream_sink_timeout_ms);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,

View File

@ -355,7 +355,7 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
}
}
void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* controller,
void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* controller,
const POpenStreamSinkRequest* request,
POpenStreamSinkResponse* response,
google::protobuf::Closure* done) {
@ -365,7 +365,7 @@ void PInternalServiceImpl::open_stream_sink(google::protobuf::RpcController* con
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamOptions stream_options;
LOG(INFO) << "open stream sink, load_id = " << request->load_id()
LOG(INFO) << "open load stream, load_id = " << request->load_id()
<< ", src_id = " << request->src_id();
for (const auto& req : request->tablets()) {

View File

@ -92,7 +92,7 @@ public:
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) override;
void open_stream_sink(google::protobuf::RpcController* controller,
void open_load_stream(google::protobuf::RpcController* controller,
const POpenStreamSinkRequest* request, POpenStreamSinkResponse* response,
google::protobuf::Closure* done) override;

View File

@ -101,7 +101,7 @@ LoadStreamStub::~LoadStreamStub() {
}
}
// open_stream_sink
// open_load_stream
// tablets means
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
@ -124,7 +124,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
if (int ret = StreamCreate(&_stream_id, cntl, &opt)) {
return Status::Error<true>(ret, "Failed to create stream");
}
cntl.set_timeout_ms(config::open_stream_sink_timeout_ms);
cntl.set_timeout_ms(config::open_load_stream_timeout_ms);
POpenStreamSinkRequest request;
*request.mutable_load_id() = _load_id;
request.set_src_id(_src_id);
@ -138,7 +138,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
// use "pooled" connection to avoid conflicts between streaming rpc and regular rpc,
// see: https://github.com/apache/brpc/issues/392
const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "pooled");
stub->open_stream_sink(&cntl, &request, &response, nullptr);
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
tablet_schema->init_from_pb(resp.tablet_schema());

View File

@ -145,7 +145,7 @@ public:
#endif
~LoadStreamStub();
// open_stream_sink
// open_load_stream
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, bool enable_profile);

View File

@ -347,7 +347,7 @@ public:
StreamService(LoadStreamMgr* load_stream_mgr)
: _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {}
virtual ~StreamService() { brpc::StreamClose(_sd); };
virtual void open_stream_sink(google::protobuf::RpcController* controller,
virtual void open_load_stream(google::protobuf::RpcController* controller,
const POpenStreamSinkRequest* request,
POpenStreamSinkResponse* response,
google::protobuf::Closure* done) {
@ -453,11 +453,11 @@ public:
auto ptablet = request.add_tablets();
ptablet->set_tablet_id(NORMAL_TABLET_ID);
ptablet->set_index_id(NORMAL_INDEX_ID);
stub.open_stream_sink(&_cntl, &request, &response, nullptr);
stub.open_load_stream(&_cntl, &request, &response, nullptr);
if (_cntl.Failed()) {
std::cerr << "open_stream_sink failed" << std::endl;
LOG(ERROR) << "Fail to open stream sink " << _cntl.ErrorText();
return Status::InternalError("Fail to open stream sink");
std::cerr << "open_load_stream failed" << std::endl;
LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText();
return Status::InternalError("Fail to open load stream");
}
return Status::OK();

View File

@ -796,7 +796,7 @@ service PBackendService {
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult);
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult);
rpc open_stream_sink(POpenStreamSinkRequest) returns (POpenStreamSinkResponse);
rpc open_load_stream(POpenStreamSinkRequest) returns (POpenStreamSinkResponse);
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult);
rpc tablet_writer_add_block_by_http(PEmptyRequest) returns (PTabletWriterAddBlockResult);
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);