This commit is contained in:
@ -34,7 +34,7 @@
|
||||
#include "util/debug_util.h"
|
||||
#include "util/time.h"
|
||||
#include "vec/sink/delta_writer_v2_pool.h"
|
||||
#include "vec/sink/load_stream_stub_pool.h"
|
||||
#include "vec/sink/load_stream_map_pool.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
|
||||
@ -80,7 +80,7 @@ class RuntimeQueryStatiticsMgr;
|
||||
class TMasterInfo;
|
||||
class LoadChannelMgr;
|
||||
class LoadStreamMgr;
|
||||
class LoadStreamStubPool;
|
||||
class LoadStreamMapPool;
|
||||
class StreamLoadExecutor;
|
||||
class RoutineLoadTaskExecutor;
|
||||
class SmallFileMgr;
|
||||
@ -242,7 +242,7 @@ public:
|
||||
}
|
||||
|
||||
#endif
|
||||
LoadStreamStubPool* load_stream_stub_pool() { return _load_stream_stub_pool.get(); }
|
||||
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }
|
||||
|
||||
vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }
|
||||
|
||||
@ -360,7 +360,7 @@ private:
|
||||
// To save meta info of external file, such as parquet footer.
|
||||
FileMetaCache* _file_meta_cache = nullptr;
|
||||
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
|
||||
std::unique_ptr<LoadStreamStubPool> _load_stream_stub_pool;
|
||||
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
|
||||
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
|
||||
std::shared_ptr<WalManager> _wal_manager;
|
||||
|
||||
|
||||
@ -98,7 +98,7 @@
|
||||
#include "vec/exec/scan/scanner_scheduler.h"
|
||||
#include "vec/runtime/vdata_stream_mgr.h"
|
||||
#include "vec/sink/delta_writer_v2_pool.h"
|
||||
#include "vec/sink/load_stream_stub_pool.h"
|
||||
#include "vec/sink/load_stream_map_pool.h"
|
||||
#include "vec/spill/spill_stream_manager.h"
|
||||
|
||||
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
|
||||
@ -229,7 +229,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
|
||||
_block_spill_mgr = new BlockSpillManager(store_paths);
|
||||
_group_commit_mgr = new GroupCommitMgr(this);
|
||||
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
|
||||
_load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
|
||||
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
|
||||
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
|
||||
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
|
||||
_spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
|
||||
@ -552,7 +552,7 @@ void ExecEnv::destroy() {
|
||||
_stream_load_executor.reset();
|
||||
_memtable_memory_limiter.reset();
|
||||
_delta_writer_v2_pool.reset();
|
||||
_load_stream_stub_pool.reset();
|
||||
_load_stream_map_pool.reset();
|
||||
SAFE_STOP(_storage_engine);
|
||||
SAFE_STOP(_spill_stream_mgr);
|
||||
SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
|
||||
|
||||
@ -15,21 +15,22 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "vec/sink/load_stream_stub_pool.h"
|
||||
#include "vec/sink/load_stream_map_pool.h"
|
||||
|
||||
#include "util/debug_points.h"
|
||||
#include "vec/sink/load_stream_stub.h"
|
||||
|
||||
namespace doris {
|
||||
class TExpr;
|
||||
|
||||
LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
|
||||
LoadStreamStubPool* pool)
|
||||
LoadStreamMapPool* pool)
|
||||
: _load_id(load_id),
|
||||
_src_id(src_id),
|
||||
_num_streams(num_streams),
|
||||
_use_cnt(num_use),
|
||||
_pool(pool) {
|
||||
_pool(pool),
|
||||
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
|
||||
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
|
||||
DCHECK(num_streams > 0) << "stream num should be greater than 0";
|
||||
DCHECK(num_use > 0) << "use num should be greater than 0";
|
||||
}
|
||||
@ -41,10 +42,9 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
|
||||
return streams;
|
||||
}
|
||||
streams = std::make_shared<Streams>();
|
||||
auto schema_map = std::make_shared<IndexToTabletSchema>();
|
||||
auto mow_map = std::make_shared<IndexToEnableMoW>();
|
||||
for (int i = 0; i < _num_streams; i++) {
|
||||
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map));
|
||||
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
|
||||
_enable_unique_mow_for_index));
|
||||
}
|
||||
_streams_for_node[dst_id] = streams;
|
||||
return streams;
|
||||
@ -103,11 +103,11 @@ Status LoadStreamMap::close_load() {
|
||||
});
|
||||
}
|
||||
|
||||
LoadStreamStubPool::LoadStreamStubPool() = default;
|
||||
LoadStreamMapPool::LoadStreamMapPool() = default;
|
||||
|
||||
LoadStreamStubPool::~LoadStreamStubPool() = default;
|
||||
std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id,
|
||||
int num_streams, int num_use) {
|
||||
LoadStreamMapPool::~LoadStreamMapPool() = default;
|
||||
std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId load_id, int64_t src_id,
|
||||
int num_streams, int num_use) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
|
||||
if (streams != nullptr) {
|
||||
@ -118,7 +118,7 @@ std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_i
|
||||
return streams;
|
||||
}
|
||||
|
||||
void LoadStreamStubPool::erase(UniqueId load_id) {
|
||||
void LoadStreamMapPool::erase(UniqueId load_id) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
_pool.erase(load_id);
|
||||
}
|
||||
@ -63,19 +63,20 @@
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/data_types/data_type.h"
|
||||
#include "vec/exprs/vexpr_fwd.h"
|
||||
#include "vec/sink/load_stream_stub.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class LoadStreamStub;
|
||||
|
||||
class LoadStreamStubPool;
|
||||
class LoadStreamMapPool;
|
||||
|
||||
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
|
||||
|
||||
class LoadStreamMap {
|
||||
public:
|
||||
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
|
||||
LoadStreamStubPool* pool);
|
||||
LoadStreamMapPool* pool);
|
||||
|
||||
std::shared_ptr<Streams> get_or_create(int64_t dst_id);
|
||||
|
||||
@ -103,17 +104,19 @@ private:
|
||||
std::atomic<int> _use_cnt;
|
||||
std::mutex _mutex;
|
||||
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
|
||||
LoadStreamStubPool* _pool = nullptr;
|
||||
LoadStreamMapPool* _pool = nullptr;
|
||||
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
|
||||
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
|
||||
|
||||
std::mutex _tablets_to_commit_mutex;
|
||||
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
|
||||
};
|
||||
|
||||
class LoadStreamStubPool {
|
||||
class LoadStreamMapPool {
|
||||
public:
|
||||
LoadStreamStubPool();
|
||||
LoadStreamMapPool();
|
||||
|
||||
~LoadStreamStubPool();
|
||||
~LoadStreamMapPool();
|
||||
|
||||
std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t src_id, int num_streams,
|
||||
int num_use);
|
||||
@ -250,7 +250,7 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
|
||||
PStreamHeader header;
|
||||
*header.mutable_load_id() = _load_id;
|
||||
header.set_src_id(_src_id);
|
||||
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
|
||||
header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
|
||||
std::ostringstream oss;
|
||||
oss << "fetching tablet schema from stream " << _stream_id
|
||||
<< ", load id: " << print_id(_load_id) << ", tablet id:";
|
||||
|
||||
@ -31,8 +31,6 @@
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "vec/sink/delta_writer_v2_pool.h"
|
||||
#include "vec/sink/load_stream_stub.h"
|
||||
#include "vec/sink/load_stream_stub_pool.h"
|
||||
|
||||
namespace doris {
|
||||
class TExpr;
|
||||
|
||||
@ -50,8 +50,8 @@
|
||||
#include "util/uid_util.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/sink/delta_writer_v2_pool.h"
|
||||
#include "vec/sink/load_stream_map_pool.h"
|
||||
#include "vec/sink/load_stream_stub.h"
|
||||
#include "vec/sink/load_stream_stub_pool.h"
|
||||
#include "vec/sink/vtablet_block_convertor.h"
|
||||
#include "vec/sink/vtablet_finder.h"
|
||||
|
||||
@ -98,7 +98,7 @@ Status VTabletWriterV2::_incremental_open_streams(
|
||||
tablet.set_partition_id(partition->id);
|
||||
tablet.set_index_id(index.index_id);
|
||||
tablet.set_tablet_id(tablet_id);
|
||||
if (!_streams_for_node->contains(node)) {
|
||||
if (!_load_stream_map->contains(node)) {
|
||||
new_backends.insert(node);
|
||||
}
|
||||
_tablets_for_node[node].emplace(tablet_id, tablet);
|
||||
@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams(
|
||||
}
|
||||
}
|
||||
for (int64_t dst_id : new_backends) {
|
||||
auto streams = _streams_for_node->get_or_create(dst_id);
|
||||
auto streams = _load_stream_map->get_or_create(dst_id);
|
||||
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
|
||||
}
|
||||
return Status::OK();
|
||||
@ -240,7 +240,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
|
||||
} else {
|
||||
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
|
||||
}
|
||||
_streams_for_node = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
|
||||
_load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
|
||||
_load_id, _backend_id, _stream_per_node, _num_local_sink);
|
||||
return Status::OK();
|
||||
}
|
||||
@ -261,7 +261,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
|
||||
|
||||
Status VTabletWriterV2::_open_streams() {
|
||||
for (auto& [dst_id, _] : _tablets_for_node) {
|
||||
auto streams = _streams_for_node->get_or_create(dst_id);
|
||||
auto streams = _load_stream_map->get_or_create(dst_id);
|
||||
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
|
||||
}
|
||||
return Status::OK();
|
||||
@ -358,7 +358,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id,
|
||||
tablet.set_index_id(index_id);
|
||||
tablet.set_tablet_id(tablet_id);
|
||||
_tablets_for_node[node_id].emplace(tablet_id, tablet);
|
||||
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
|
||||
streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index));
|
||||
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id));
|
||||
}
|
||||
_stream_index = (_stream_index + 1) % _stream_per_node;
|
||||
@ -467,13 +467,13 @@ Status VTabletWriterV2::_cancel(Status status) {
|
||||
_delta_writer_for_tablet->cancel(status);
|
||||
_delta_writer_for_tablet.reset();
|
||||
}
|
||||
if (_streams_for_node) {
|
||||
_streams_for_node->for_each([status](int64_t dst_id, const Streams& streams) {
|
||||
if (_load_stream_map) {
|
||||
_load_stream_map->for_each([status](int64_t dst_id, const Streams& streams) {
|
||||
for (auto& stream : streams) {
|
||||
stream->cancel(status);
|
||||
}
|
||||
});
|
||||
_streams_for_node->release();
|
||||
_load_stream_map->release();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -539,29 +539,28 @@ Status VTabletWriterV2::close(Status exec_status) {
|
||||
}
|
||||
|
||||
_calc_tablets_to_commit();
|
||||
const bool is_last_sink = _streams_for_node->release();
|
||||
const bool is_last_sink = _load_stream_map->release();
|
||||
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
|
||||
<< ", load_id=" << print_id(_load_id);
|
||||
|
||||
// send CLOSE_LOAD and close_wait on all streams
|
||||
if (is_last_sink) {
|
||||
RETURN_IF_ERROR(_streams_for_node->close_load());
|
||||
RETURN_IF_ERROR(_load_stream_map->close_load());
|
||||
SCOPED_TIMER(_close_load_timer);
|
||||
RETURN_IF_ERROR(_streams_for_node->for_each_st(
|
||||
[this](int64_t dst_id, const Streams& streams) -> Status {
|
||||
for (auto& stream : streams) {
|
||||
int64_t remain_ms =
|
||||
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
|
||||
_timeout_watch.elapsed_time() / 1000 / 1000;
|
||||
if (remain_ms <= 0) {
|
||||
LOG(WARNING) << "load timed out before close waiting, load_id="
|
||||
<< print_id(_load_id);
|
||||
return Status::TimedOut("load timed out before close waiting");
|
||||
}
|
||||
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
|
||||
}
|
||||
return Status::OK();
|
||||
}));
|
||||
RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id,
|
||||
const Streams& streams) -> Status {
|
||||
for (auto& stream : streams) {
|
||||
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
|
||||
_timeout_watch.elapsed_time() / 1000 / 1000;
|
||||
if (remain_ms <= 0) {
|
||||
LOG(WARNING) << "load timed out before close waiting, load_id="
|
||||
<< print_id(_load_id);
|
||||
return Status::TimedOut("load timed out before close waiting");
|
||||
}
|
||||
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
|
||||
}
|
||||
return Status::OK();
|
||||
}));
|
||||
}
|
||||
|
||||
// calculate and submit commit info
|
||||
@ -570,7 +569,7 @@ Status VTabletWriterV2::close(Status exec_status) {
|
||||
std::unordered_map<int64_t, Status> failed_reason;
|
||||
std::vector<TTabletCommitInfo> tablet_commit_infos;
|
||||
|
||||
_streams_for_node->for_each([&](int64_t dst_id, const Streams& streams) {
|
||||
_load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
|
||||
std::unordered_set<int64_t> known_tablets;
|
||||
for (const auto& stream : streams) {
|
||||
for (auto [tablet_id, reason] : stream->failed_tablets()) {
|
||||
@ -645,7 +644,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
|
||||
}
|
||||
LOG(WARNING) << msg;
|
||||
}
|
||||
_streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
|
||||
_load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -217,7 +217,7 @@ private:
|
||||
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node;
|
||||
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
|
||||
|
||||
std::shared_ptr<LoadStreamMap> _streams_for_node;
|
||||
std::shared_ptr<LoadStreamMap> _load_stream_map;
|
||||
|
||||
size_t _stream_index = 0;
|
||||
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
|
||||
|
||||
@ -14,22 +14,21 @@
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
#include "vec/sink/load_stream_stub_pool.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "vec/sink/load_stream_map_pool.h"
|
||||
#include "vec/sink/load_stream_stub.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class LoadStreamStubPoolTest : public testing::Test {
|
||||
class LoadStreamMapPoolTest : public testing::Test {
|
||||
public:
|
||||
LoadStreamStubPoolTest() = default;
|
||||
virtual ~LoadStreamStubPoolTest() = default;
|
||||
LoadStreamMapPoolTest() = default;
|
||||
virtual ~LoadStreamMapPoolTest() = default;
|
||||
};
|
||||
|
||||
TEST_F(LoadStreamStubPoolTest, test) {
|
||||
LoadStreamStubPool pool;
|
||||
TEST_F(LoadStreamMapPoolTest, test) {
|
||||
LoadStreamMapPool pool;
|
||||
int64_t src_id = 100;
|
||||
PUniqueId load_id;
|
||||
load_id.set_lo(1);
|
||||
Reference in New Issue
Block a user