[refactor](remove unused code) remove load stream mgr (#16580)

remove old stream load pipe
remove old stream load manager

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-02-10 07:46:18 +08:00
committed by GitHub
parent 438daaaf1c
commit 4fcd6cd236
18 changed files with 2 additions and 1036 deletions

View File

@ -397,7 +397,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_loadId(ctx->id.to_thrift());
if (ctx->use_streaming) {
auto pipe = std::make_shared<io::StreamLoadPipe>(
kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
ctx->body_bytes /* total_length */);
RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, pipe));

View File

@ -37,7 +37,6 @@
#include "io/s3_writer.h"
#include "olap/iterators.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
namespace doris {
@ -198,15 +197,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
return Status::OK();
}
Status FileFactory::create_pipe_reader(const TUniqueId& load_id,
std::shared_ptr<FileReader>& file_reader) {
file_reader = ExecEnv::GetInstance()->load_stream_mgr()->get(load_id);
if (!file_reader) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
return Status::OK();
}
Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,

View File

@ -80,10 +80,6 @@ public:
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader);
// [deprecated] Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id,
std::shared_ptr<FileReader>& file_reader);
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,

View File

@ -59,7 +59,6 @@ set(RUNTIME_FILES
stream_load/stream_load_context.cpp
stream_load/stream_load_executor.cpp
stream_load/stream_load_recorder.cpp
stream_load/load_stream_mgr.cpp
stream_load/new_load_stream_mgr.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp

View File

@ -47,7 +47,6 @@ class ExternalScanContextMgr;
class FragmentMgr;
class ResultCache;
class LoadPathMgr;
class LoadStreamMgr;
class NewLoadStreamMgr;
class MemTrackerLimiter;
class MemTracker;
@ -156,7 +155,6 @@ public:
return _function_client_cache;
}
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
@ -230,7 +228,6 @@ private:
BfdParser* _bfd_parser = nullptr;
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
NewLoadStreamMgr* _new_load_stream_mgr = nullptr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;

View File

@ -42,7 +42,6 @@
#include "runtime/result_queue_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"
#include "runtime/small_file_mgr.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/tmp_file_mgr.h"
@ -116,7 +115,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_bfd_parser = BfdParser::create();
_broker_mgr = new BrokerMgr(this);
_load_channel_mgr = new LoadChannelMgr();
_load_stream_mgr = new LoadStreamMgr();
_new_load_stream_mgr = new NewLoadStreamMgr();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
@ -313,7 +311,6 @@ void ExecEnv::_destroy() {
_deregister_metrics();
SAFE_DELETE(_internal_client_cache);
SAFE_DELETE(_function_client_cache);
SAFE_DELETE(_load_stream_mgr);
SAFE_DELETE(_load_channel_mgr);
SAFE_DELETE(_broker_mgr);
SAFE_DELETE(_bfd_parser);

View File

@ -506,7 +506,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
stream_load_ctx->need_commit_self = true;
stream_load_ctx->need_rollback = true;
auto pipe = std::make_shared<io::StreamLoadPipe>(
kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;

View File

@ -1,54 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <map>
#include <string>
#include <vector>
#include "io/file_reader.h"
#include "librdkafka/rdkafka.h"
#include "runtime/message_body_sink.h"
#include "runtime/stream_load/stream_load_pipe.h"
namespace doris {
class KafkaConsumerPipe : public StreamLoadPipe {
public:
KafkaConsumerPipe(size_t max_buffered_bytes = 1024 * 1024, size_t min_chunk_size = 64 * 1024)
: StreamLoadPipe(max_buffered_bytes, min_chunk_size) {}
virtual ~KafkaConsumerPipe() {}
Status append_with_line_delimiter(const char* data, size_t size) {
Status st = append(data, size);
if (!st.ok()) {
return st;
}
// append the line delimiter
st = append("\n", 1);
return st;
}
Status append_json(const char* data, size_t size) { return append_and_flush(data, size); }
};
} // end namespace doris

View File

@ -1,36 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/stream_load/load_stream_mgr.h"
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(stream_load_pipe_count, MetricUnit::NOUNIT);
LoadStreamMgr::LoadStreamMgr() {
// Each StreamLoadPipe has a limited buffer size (default 1M), it's not needed to count the
// actual size of all StreamLoadPipe.
REGISTER_HOOK_METRIC(stream_load_pipe_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
return _stream_map.size();
});
}
LoadStreamMgr::~LoadStreamMgr() {
DEREGISTER_HOOK_METRIC(stream_load_pipe_count);
}
} // namespace doris

View File

@ -1,72 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <mutex>
#include <unordered_map>
#include "runtime/stream_load/stream_load_pipe.h" // for StreamLoadPipe
#include "util/doris_metrics.h"
#include "util/uid_util.h" // for std::hash for UniqueId
namespace doris {
// used to register all streams in process so that other module can get this stream
class LoadStreamMgr {
public:
LoadStreamMgr();
~LoadStreamMgr();
Status put(const UniqueId& id, std::shared_ptr<StreamLoadPipe> stream) {
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it != std::end(_stream_map)) {
return Status::InternalError("id already exist");
}
_stream_map.emplace(id, stream);
VLOG_NOTICE << "put stream load pipe: " << id;
return Status::OK();
}
std::shared_ptr<StreamLoadPipe> get(const UniqueId& id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it == std::end(_stream_map)) {
return nullptr;
}
auto stream = it->second;
_stream_map.erase(it);
return stream;
}
void remove(const UniqueId& id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _stream_map.find(id);
if (it != std::end(_stream_map)) {
_stream_map.erase(it);
VLOG_NOTICE << "remove stream load pipe: " << id;
}
}
private:
std::mutex _lock;
std::unordered_map<UniqueId, std::shared_ptr<StreamLoadPipe>> _stream_map;
};
} // namespace doris

View File

@ -28,7 +28,6 @@
namespace doris {
// used to register all streams in process so that other module can get this stream
// TODO(ftw): should be renamed to `LoadStreamMgr` after new file reader is ready.
class NewLoadStreamMgr {
public:
NewLoadStreamMgr();

View File

@ -28,7 +28,6 @@
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "service/backend_options.h"

View File

@ -1,282 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <condition_variable>
#include <deque>
#include <mutex>
#include "gen_cpp/internal_service.pb.h"
#include "io/file_reader.h"
#include "runtime/message_body_sink.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/byte_buffer.h"
namespace doris {
const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
// StreamLoadPipe use to transfer data from producer to consumer
// Data in pip is stored in chunks.
class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
size_t min_chunk_size = 64 * 1024, int64_t total_length = -1,
bool use_proto = false)
: _buffered_bytes(0),
_proto_buffered_bytes(0),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size),
_total_length(total_length),
_use_proto(use_proto) {}
~StreamLoadPipe() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
while (!_buf_queue.empty()) {
_buf_queue.pop_front();
}
}
Status open() override { return Status::OK(); }
Status append_and_flush(const char* data, size_t size, size_t proto_byte_size = 0) {
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
}
Status append(const char* data, size_t size) override {
size_t pos = 0;
if (_write_buf != nullptr) {
if (size < _write_buf->remaining()) {
_write_buf->put_bytes(data, size);
return Status::OK();
} else {
pos = _write_buf->remaining();
_write_buf->put_bytes(data, pos);
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
_write_buf.reset();
}
}
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
_write_buf = ByteBuffer::allocate(chunk_size);
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Status append(const ByteBufferPtr& buf) override {
if (_write_buf != nullptr) {
_write_buf->flip();
RETURN_IF_ERROR(_append(_write_buf));
_write_buf.reset();
}
return _append(buf);
}
// If _total_length == -1, this should be a Kafka routine load task,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status read_one_message(std::unique_ptr<uint8_t[]>* data, int64_t* length) override {
if (_total_length < -1) {
return Status::InternalError("invalid, _total_length is: {}", _total_length);
} else if (_total_length == 0) {
// no data
*length = 0;
return Status::OK();
}
if (_total_length == -1) {
return _read_next_buffer(data, length);
}
// _total_length > 0, read the entire data
data->reset(new uint8_t[_total_length]);
bool eof = false;
Status st = read(data->get(), _total_length, length, &eof);
if (eof) {
*length = 0;
}
return st;
}
Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* eof) override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
*bytes_read = 0;
while (*bytes_read < data_size) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::InternalError("cancelled: {}", _cancelled_reason);
}
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
data_size = *bytes_read;
*eof = (*bytes_read == 0);
return Status::OK();
}
auto buf = _buf_queue.front();
int64_t copy_size = std::min(data_size - *bytes_read, (int64_t)buf->remaining());
buf->get_bytes((char*)data + *bytes_read, copy_size);
*bytes_read += copy_size;
if (!buf->has_remaining()) {
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
_put_cond.notify_one();
}
}
DCHECK(*bytes_read == data_size)
<< "*bytes_read=" << *bytes_read << ", data_size=" << data_size;
*eof = false;
return Status::OK();
}
Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override {
return Status::InternalError("Not implemented");
}
int64_t size() override { return 0; }
Status seek(int64_t position) override { return Status::InternalError("Not implemented"); }
Status tell(int64_t* position) override { return Status::InternalError("Not implemented"); }
// called when consumer finished
void close() override { cancel("closed"); }
bool closed() override { return _cancelled; }
// called when producer finished
Status finish() override {
if (_write_buf != nullptr) {
_write_buf->flip();
_append(_write_buf);
_write_buf.reset();
}
{
std::lock_guard<std::mutex> l(_lock);
_finished = true;
}
_get_cond.notify_all();
return Status::OK();
}
// called when producer/consumer failed
void cancel(const std::string& reason) override {
{
std::lock_guard<std::mutex> l(_lock);
_cancelled = true;
_cancelled_reason = reason;
}
_get_cond.notify_all();
_put_cond.notify_all();
}
private:
// read the next buffer from _buf_queue
Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, int64_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
}
// cancelled
if (_cancelled) {
return Status::InternalError("cancelled: {}", _cancelled_reason);
}
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
data->reset();
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
data->reset(new uint8_t[*length]);
buf->get_bytes((char*)(data->get()), *length);
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
if (_use_proto) {
PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
_proto_buffered_bytes -= (sizeof(PDataRow*) + (*ptr)->GetCachedSize());
}
_put_cond.notify_one();
return Status::OK();
}
Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0) {
{
std::unique_lock<std::mutex> l(_lock);
// if _buf_queue is empty, we append this buf without size check
if (_use_proto) {
while (!_cancelled && !_buf_queue.empty() &&
(_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) {
_put_cond.wait(l);
}
} else {
while (!_cancelled && !_buf_queue.empty() &&
_buffered_bytes + buf->remaining() > _max_buffered_bytes) {
_put_cond.wait(l);
}
}
if (_cancelled) {
return Status::InternalError("cancelled: {}", _cancelled_reason);
}
_buf_queue.push_back(buf);
if (_use_proto) {
_proto_buffered_bytes += proto_byte_size;
} else {
_buffered_bytes += buf->remaining();
}
}
_get_cond.notify_one();
return Status::OK();
}
// Blocking queue
std::mutex _lock;
size_t _buffered_bytes;
size_t _proto_buffered_bytes;
size_t _max_buffered_bytes;
size_t _min_chunk_size;
// The total amount of data expected to be read.
// In some scenarios, such as loading json format data through stream load,
// the data needs to be completely read before it can be parsed,
// so the total size of the data needs to be known.
// The default is -1, which means that the data arrives in a stream
// and the length is unknown.
// size_t is unsigned, so use int64_t
int64_t _total_length = -1;
bool _use_proto = false;
std::deque<ByteBufferPtr> _buf_queue;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
ByteBufferPtr _write_buf;
};
} // namespace doris

View File

@ -66,7 +66,6 @@ set(HTTP_TEST_FILES
http/http_utils_test.cpp
http/http_client_test.cpp
# TODO this will overide HttpChannel and make other test failed
# http/stream_load_test.cpp
# http/metrics_action_test.cpp
)
set(IO_TEST_FILES
@ -145,10 +144,8 @@ set(RUNTIME_TEST_FILES
runtime/string_value_test.cpp
runtime/fragment_mgr_test.cpp
runtime/mem_limit_test.cpp
runtime/stream_load_pipe_test.cpp
runtime/snapshot_loader_test.cpp
runtime/user_function_cache_test.cpp
runtime/kafka_consumer_pipe_test.cpp
runtime/routine_load_task_executor_test.cpp
runtime/small_file_mgr_test.cpp
runtime/heartbeat_flags_test.cpp

View File

@ -1,234 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/stream_load.h"
#include <event2/http.h>
#include <event2/http_struct.h>
#include <gtest/gtest.h>
#include <rapidjson/document.h>
#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "http/http_channel.h"
#include "http/http_request.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "util/brpc_client_cache.h"
#include "util/cpu_info.h"
struct mg_connection;
namespace doris {
std::string k_response_str;
// Send Unauthorized status with basic challenge
void HttpChannel::send_basic_challenge(HttpRequest* req, const std::string& realm) {}
void HttpChannel::send_error(HttpRequest* request, HttpStatus status) {}
void HttpChannel::send_reply(HttpRequest* request, HttpStatus status) {}
void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std::string& content) {
k_response_str = content;
}
void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) {}
extern TLoadTxnBeginResult k_stream_load_begin_result;
extern TLoadTxnCommitResult k_stream_load_commit_result;
extern TLoadTxnRollbackResult k_stream_load_rollback_result;
extern TStreamLoadPutResult k_stream_load_put_result;
extern Status k_stream_load_plan_status;
class StreamLoadActionTest : public testing::Test {
public:
StreamLoadActionTest() {}
virtual ~StreamLoadActionTest() {}
void SetUp() override {
k_stream_load_begin_result = TLoadTxnBeginResult();
k_stream_load_commit_result = TLoadTxnCommitResult();
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();
k_stream_load_plan_status = Status::OK();
k_response_str = "";
config::streaming_load_max_mb = 1;
_env._master_info = new TMasterInfo();
_env._load_stream_mgr = new LoadStreamMgr();
_env._internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_env._function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_env._stream_load_executor = new StreamLoadExecutor(&_env);
_evhttp_req = evhttp_request_new(nullptr, nullptr);
}
void TearDown() override {
delete _env._internal_client_cache;
_env._internal_client_cache = nullptr;
delete _env._function_client_cache;
_env._function_client_cache = nullptr;
delete _env._load_stream_mgr;
_env._load_stream_mgr = nullptr;
delete _env._master_info;
_env._master_info = nullptr;
delete _env._stream_load_executor;
_env._stream_load_executor = nullptr;
if (_evhttp_req != nullptr) {
evhttp_request_free(_evhttp_req);
}
}
private:
ExecEnv _env;
evhttp_request* _evhttp_req = nullptr;
};
TEST_F(StreamLoadActionTest, no_auth) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Fail", doc["Status"].GetString());
}
TEST_F(StreamLoadActionTest, normal) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "0");
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Success", doc["Status"].GetString());
}
TEST_F(StreamLoadActionTest, put_fail) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
Status status = Status::InternalError("TestFail");
status.to_thrift(&k_stream_load_put_result.status);
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Fail", doc["Status"].GetString());
}
TEST_F(StreamLoadActionTest, commit_fail) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
Status status = Status::InternalError("TestFail");
status.to_thrift(&k_stream_load_commit_result.status);
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Fail", doc["Status"].GetString());
}
TEST_F(StreamLoadActionTest, begin_fail) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
Status status = Status::InternalError("TestFail");
status.to_thrift(&k_stream_load_begin_result.status);
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Fail", doc["Status"].GetString());
}
#if 0
TEST_F(StreamLoadActionTest, receive_failed) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::TRANSFER_ENCODING, "chunked");
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Fail", doc["Status"].GetString());
}
#endif
TEST_F(StreamLoadActionTest, plan_fail) {
StreamLoadAction action(&_env);
HttpRequest request(_evhttp_req);
struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16");
k_stream_load_plan_status = Status::InternalError("TestFail");
request.set_handler(&action);
action.on_header(&request);
action.handle(&request);
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
EXPECT_STREQ("Fail", doc["Status"].GetString());
}
} // namespace doris

View File

@ -1,66 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/routine_load/kafka_consumer_pipe.h"
#include <gtest/gtest.h>
namespace doris {
class KafkaConsumerPipeTest : public testing::Test {
public:
KafkaConsumerPipeTest() {}
virtual ~KafkaConsumerPipeTest() {}
void SetUp() override {}
void TearDown() override {}
private:
};
TEST_F(KafkaConsumerPipeTest, append_read) {
KafkaConsumerPipe k_pipe(1024 * 1024, 64 * 1024);
std::string msg1 = "i have a dream";
std::string msg2 = "This is from kafka";
Status st;
st = k_pipe.append_with_line_delimiter(msg1.c_str(), msg1.length());
EXPECT_TRUE(st.ok());
st = k_pipe.append_with_line_delimiter(msg2.c_str(), msg2.length());
EXPECT_TRUE(st.ok());
st = k_pipe.finish();
EXPECT_TRUE(st.ok());
char buf[1024];
int64_t data_size = 1024;
int64_t read_bytes = 0;
bool eof = false;
st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(read_bytes, msg1.length() + msg2.length() + 2);
EXPECT_EQ(eof, false);
data_size = 1024;
st = k_pipe.read((uint8_t*)buf, data_size, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(read_bytes, 0);
EXPECT_EQ(eof, true);
}
} // namespace doris

View File

@ -1,261 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/stream_load/stream_load_pipe.h"
#include <gtest/gtest.h>
#include <thread>
namespace doris {
class StreamLoadPipeTest : public testing::Test {
public:
StreamLoadPipeTest() {}
virtual ~StreamLoadPipeTest() {}
void SetUp() override {}
};
TEST_F(StreamLoadPipeTest, append_buffer) {
StreamLoadPipe pipe(66, 64);
auto appender = [&pipe] {
int k = 0;
for (int i = 0; i < 2; ++i) {
auto byte_buf = ByteBuffer::allocate(64);
char buf[64];
for (int j = 0; j < 64; ++j) {
buf[j] = '0' + (k++ % 10);
}
byte_buf->put_bytes(buf, 64);
byte_buf->flip();
pipe.append(byte_buf);
}
pipe.finish();
};
std::thread t1(appender);
char buf[256];
int64_t buf_len = 256;
int64_t read_bytes = 0;
bool eof = false;
auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(128, read_bytes);
EXPECT_FALSE(eof);
for (int i = 0; i < 128; ++i) {
EXPECT_EQ('0' + (i % 10), buf[i]);
}
st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(0, read_bytes);
EXPECT_TRUE(eof);
t1.join();
}
TEST_F(StreamLoadPipeTest, append_bytes) {
StreamLoadPipe pipe(66, 64);
auto appender = [&pipe] {
for (int i = 0; i < 128; ++i) {
char buf = '0' + (i % 10);
pipe.append(&buf, 1);
}
pipe.finish();
};
std::thread t1(appender);
char buf[256];
int64_t buf_len = 256;
int64_t read_bytes = 0;
bool eof = false;
auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(128, read_bytes);
EXPECT_FALSE(eof);
for (int i = 0; i < 128; ++i) {
EXPECT_EQ('0' + (i % 10), buf[i]);
}
st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(0, read_bytes);
EXPECT_TRUE(eof);
t1.join();
}
TEST_F(StreamLoadPipeTest, append_bytes2) {
StreamLoadPipe pipe(66, 64);
auto appender = [&pipe] {
for (int i = 0; i < 128; ++i) {
char buf = '0' + (i % 10);
pipe.append(&buf, 1);
}
pipe.finish();
};
std::thread t1(appender);
char buf[128];
int64_t buf_len = 62;
int64_t read_bytes = 0;
bool eof = false;
auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(62, read_bytes);
EXPECT_FALSE(eof);
for (int i = 0; i < 62; ++i) {
EXPECT_EQ('0' + (i % 10), buf[i]);
}
for (int i = 62; i < 128; ++i) {
char ch;
buf_len = 1;
auto st = pipe.read((uint8_t*)&ch, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(1, read_bytes);
EXPECT_FALSE(eof);
EXPECT_EQ('0' + (i % 10), ch);
}
st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(0, read_bytes);
EXPECT_TRUE(eof);
t1.join();
}
TEST_F(StreamLoadPipeTest, append_mix) {
StreamLoadPipe pipe(66, 64);
auto appender = [&pipe] {
// 10
int k = 0;
for (int i = 0; i < 10; ++i) {
char buf = '0' + (k++ % 10);
pipe.append(&buf, 1);
}
// 60
{
auto byte_buf = ByteBuffer::allocate(60);
char buf[60];
for (int j = 0; j < 60; ++j) {
buf[j] = '0' + (k++ % 10);
}
byte_buf->put_bytes(buf, 60);
byte_buf->flip();
pipe.append(byte_buf);
}
// 8
for (int i = 0; i < 8; ++i) {
char buf = '0' + (k++ % 10);
pipe.append(&buf, 1);
}
// 50
{
auto byte_buf = ByteBuffer::allocate(50);
char buf[50];
for (int j = 0; j < 50; ++j) {
buf[j] = '0' + (k++ % 10);
}
byte_buf->put_bytes(buf, 50);
byte_buf->flip();
pipe.append(byte_buf);
}
pipe.finish();
};
std::thread t1(appender);
char buf[128];
int64_t buf_len = 128;
int64_t read_bytes = 0;
bool eof = false;
auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(128, read_bytes);
EXPECT_FALSE(eof);
for (int i = 0; i < 128; ++i) {
EXPECT_EQ('0' + (i % 10), buf[i]);
}
st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_TRUE(st.ok());
EXPECT_EQ(0, read_bytes);
EXPECT_TRUE(eof);
t1.join();
}
TEST_F(StreamLoadPipeTest, cancel) {
StreamLoadPipe pipe(66, 64);
auto appender = [&pipe] {
int k = 0;
for (int i = 0; i < 10; ++i) {
char buf = '0' + (k++ % 10);
pipe.append(&buf, 1);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pipe.cancel("test");
};
std::thread t1(appender);
char buf[128];
int64_t buf_len = 128;
int64_t read_bytes = 0;
bool eof = false;
auto st = pipe.read((uint8_t*)buf, buf_len, &read_bytes, &eof);
EXPECT_FALSE(st.ok());
t1.join();
}
TEST_F(StreamLoadPipeTest, close) {
StreamLoadPipe pipe(66, 64);
auto appender = [&pipe] {
int k = 0;
{
auto byte_buf = ByteBuffer::allocate(64);
char buf[64];
for (int j = 0; j < 64; ++j) {
buf[j] = '0' + (k++ % 10);
}
byte_buf->put_bytes(buf, 64);
byte_buf->flip();
pipe.append(byte_buf);
}
{
auto byte_buf = ByteBuffer::allocate(64);
char buf[64];
for (int j = 0; j < 64; ++j) {
buf[j] = '0' + (k++ % 10);
}
byte_buf->put_bytes(buf, 64);
byte_buf->flip();
auto st = pipe.append(byte_buf);
EXPECT_FALSE(st.ok());
}
};
std::thread t1(appender);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
pipe.close();
t1.join();
}
} // namespace doris

View File

@ -30,7 +30,6 @@
#include "runtime/exec_env.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/types.h"
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
@ -344,7 +343,6 @@ public:
k_add_batch_status = Status::OK();
_env = ExecEnv::GetInstance();
_env->_master_info = new TMasterInfo();
_env->_load_stream_mgr = new LoadStreamMgr();
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
ThreadPoolBuilder("SendBatchThreadPool")
@ -359,7 +357,6 @@ public:
void TearDown() override {
SAFE_DELETE(_env->_internal_client_cache);
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_load_stream_mgr);
SAFE_DELETE(_env->_master_info);
if (_server) {
_server->Stop(100);